001/*
002 *  Licensed to the Apache Software Foundation (ASF) under one
003 *  or more contributor license agreements.  See the NOTICE file
004 *  distributed with this work for additional information
005 *  regarding copyright ownership.  The ASF licenses this file
006 *  to you under the Apache License, Version 2.0 (the
007 *  "License"); you may not use this file except in compliance
008 *  with the License.  You may obtain a copy of the License at
009 *  
010 *    http://www.apache.org/licenses/LICENSE-2.0
011 *  
012 *  Unless required by applicable law or agreed to in writing,
013 *  software distributed under the License is distributed on an
014 *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 *  KIND, either express or implied.  See the License for the
016 *  specific language governing permissions and limitations
017 *  under the License. 
018 *  
019 */
020package org.apache.directory.server.core.partition.impl.btree.jdbm;
021
022
023import java.io.IOException;
024import java.util.Comparator;
025import java.util.Map;
026
027import jdbm.RecordManager;
028import jdbm.btree.BTree;
029import jdbm.helper.Serializer;
030import jdbm.helper.Tuple;
031import jdbm.helper.TupleBrowser;
032
033import org.apache.directory.api.ldap.model.cursor.Cursor;
034import org.apache.directory.api.ldap.model.cursor.CursorException;
035import org.apache.directory.api.ldap.model.cursor.EmptyCursor;
036import org.apache.directory.api.ldap.model.cursor.SingletonCursor;
037import org.apache.directory.api.ldap.model.exception.LdapException;
038import org.apache.directory.api.ldap.model.exception.LdapOtherException;
039import org.apache.directory.api.ldap.model.schema.SchemaManager;
040import org.apache.directory.api.ldap.model.schema.comparators.SerializableComparator;
041import org.apache.directory.api.util.Strings;
042import org.apache.directory.api.util.SynchronizedLRUMap;
043import org.apache.directory.server.core.api.partition.PartitionTxn;
044import org.apache.directory.server.core.avltree.ArrayMarshaller;
045import org.apache.directory.server.core.avltree.ArrayTree;
046import org.apache.directory.server.core.avltree.ArrayTreeCursor;
047import org.apache.directory.server.core.avltree.Marshaller;
048import org.apache.directory.server.i18n.I18n;
049import org.apache.directory.server.xdbm.AbstractTable;
050import org.apache.directory.server.xdbm.KeyTupleArrayCursor;
051import org.slf4j.Logger;
052import org.slf4j.LoggerFactory;
053
054
055/**
056 * A jdbm Btree wrapper that enables duplicate sorted keys using collections.
057 *
058 * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
059 */
060public class JdbmTable<K, V> extends AbstractTable<K, V>
061{
062    /** A logger for this class */
063    private static final Logger LOG = LoggerFactory.getLogger( JdbmTable.class );
064
065    /** the JDBM record manager for the file this table is managed in */
066    private final RecordManager recMan;
067
068    /** the wrappedCursor JDBM btree used in this Table */
069    private BTree<K, V> bt;
070
071    /** the limit at which we start using btree redirection for duplicates */
072    private int numDupLimit = JdbmIndex.DEFAULT_DUPLICATE_LIMIT;
073
074    /** a cache of duplicate BTrees */
075    private final Map<Long, BTree<K, V>> duplicateBtrees;
076
077    /** A value serializer */
078    private final Serializer valueSerializer;
079
080    /** A marshaller used to serialize/deserialize values stored in the Table */
081    Marshaller<ArrayTree<V>> marshaller;
082
083
084    // ------------------------------------------------------------------------
085    // C O N S T R U C T O R
086    // ------------------------------------------------------------------------
087
088    /**
089     * Creates a Jdbm BTree based tuple Table abstraction that enables 
090     * duplicates.
091     *
092     * @param schemaManager The server schemaManager
093     * @param name the name of the table
094     * @param numDupLimit the size limit of duplicates before switching to BTrees for values instead of AvlTrees
095     * @param manager the record manager to be used for this table
096     * @param keyComparator a key comparator
097     * @param valueComparator a value comparator
098     * @param keySerializer a serializer to use for the keys instead of using
099     * default Java serialization which could be very expensive
100     * @param valueSerializer a serializer to use for the values instead of
101     * using default Java serialization which could be very expensive
102     * @throws IOException if the table's file cannot be created
103     */
104    @SuppressWarnings("unchecked")
105    public JdbmTable( SchemaManager schemaManager, String name, int numDupLimit, RecordManager manager,
106        Comparator<K> keyComparator, Comparator<V> valueComparator,
107        Serializer keySerializer, Serializer valueSerializer )
108        throws IOException
109    {
110        super( schemaManager, name, keyComparator, valueComparator );
111
112        if ( valueComparator == null )
113        {
114            throw new IllegalArgumentException( I18n.err( I18n.ERR_592 ) );
115        }
116
117        // TODO make the size of the duplicate btree cache configurable via constructor
118        duplicateBtrees = new SynchronizedLRUMap( 100 );
119
120        if ( valueSerializer != null )
121        {
122            marshaller = new ArrayMarshaller<>( valueComparator,
123                new MarshallerSerializerBridge<V>( valueSerializer ) );
124        }
125        else
126        {
127            marshaller = new ArrayMarshaller<>( valueComparator );
128        }
129
130        this.numDupLimit = numDupLimit;
131        this.recMan = manager;
132        this.valueSerializer = valueSerializer;
133        this.allowsDuplicates = true;
134        long recId = recMan.getNamedObject( name );
135
136        if ( recId == 0 ) // Create new main BTree
137        {
138            // we do not use the value serializer in the btree since duplicates will use
139            // either BTreeRedirect objects or AvlTree objects whose marshalling is
140            // explicitly managed by this code.  Value serialization is delegated to these
141            // marshallers.
142
143            bt = new BTree<>( recMan, keyComparator, keySerializer, null );
144            recId = bt.getRecordId();
145            recMan.setNamedObject( name, recId );
146        }
147        else
148        // Load existing BTree
149        {
150            bt = new BTree<K, V>().load( recMan, recId );
151            ( ( SerializableComparator<K> ) bt.getComparator() ).setSchemaManager( schemaManager );
152            
153            count = bt.size();
154        }
155    }
156
157
158    /**
159     * Creates a Jdbm BTree based tuple Table abstraction without duplicates 
160     * enabled using a simple key comparator.
161     *
162     * @param schemaManager The server schemaManager
163     * @param name the name of the table
164     * @param manager the record manager to be used for this table
165     * @param keyComparator a tuple comparator
166     * @param keySerializer a serializer to use for the keys instead of using
167     * default Java serialization which could be very expensive
168     * @param valueSerializer a serializer to use for the values instead of
169     * using default Java serialization which could be very expensive
170     * @throws IOException if the table's file cannot be created
171     */
172    public JdbmTable( SchemaManager schemaManager, String name, RecordManager manager, Comparator<K> keyComparator,
173        Serializer keySerializer, Serializer valueSerializer )
174        throws IOException
175    {
176        super( schemaManager, name, keyComparator, null );
177
178        this.duplicateBtrees = null;
179        this.numDupLimit = Integer.MAX_VALUE;
180        this.recMan = manager;
181
182        this.valueSerializer = valueSerializer;
183
184        this.allowsDuplicates = false;
185
186        long recId = recMan.getNamedObject( name );
187
188        if ( recId != 0 )
189        {
190            bt = new BTree<K, V>().load( recMan, recId );
191            ( ( SerializableComparator<K> ) bt.getComparator() ).setSchemaManager( schemaManager );
192            bt.setValueSerializer( valueSerializer );
193            
194            count = bt.size();
195        }
196        else
197        {
198            bt = new BTree<>( recMan, keyComparator, keySerializer, valueSerializer );
199            recId = bt.getRecordId();
200            recMan.setNamedObject( name, recId );
201        }
202    }
203
204
205    // ------------------------------------------------------------------------
206    // Count Overloads
207    // ------------------------------------------------------------------------
208    /**
209     * {@inheritDoc}
210     */
211    @Override
212    public long count( PartitionTxn transaction, K key ) throws LdapException
213    {
214        if ( key == null )
215        {
216            return 0L;
217        }
218
219        try
220        {
221            if ( !allowsDuplicates )
222            {
223                if ( null == bt.find( key ) )
224                {
225                    return 0L;
226                }
227                else
228                {
229                    return 1L;
230                }
231            }
232
233            DupsContainer<V> values = getDupsContainer( ( byte[] ) bt.find( key ) );
234
235            if ( values.isArrayTree() )
236            {
237                return values.getArrayTree().size();
238            }
239
240            return getBTree( values.getBTreeRedirect() ).size();
241        }
242        catch ( IOException ioe )
243        {
244            throw new LdapOtherException( ioe.getMessage() );
245        }
246    }
247
248
249    // ------------------------------------------------------------------------
250    // get/has/put/remove Methods and Overloads
251    // ------------------------------------------------------------------------
252
253    /**
254     * {@inheritDoc}
255     */
256    @SuppressWarnings("unchecked")
257    @Override
258    public V get( PartitionTxn transaction, K key ) throws LdapException
259    {
260        if ( key == null )
261        {
262            return null;
263        }
264
265        try
266        {
267            if ( !allowsDuplicates )
268            {
269                return bt.find( key );
270            }
271
272            DupsContainer<V> values = getDupsContainer( ( byte[] ) bt.find( key ) );
273
274            if ( values.isArrayTree() )
275            {
276                ArrayTree<V> set = values.getArrayTree();
277
278                if ( set.getFirst() == null )
279                {
280                    return null;
281                }
282
283                return set.getFirst();
284            }
285
286            // Handle values if they are stored in another BTree
287            BTree tree = getBTree( values.getBTreeRedirect() );
288
289            jdbm.helper.Tuple tuple = new jdbm.helper.Tuple();
290            TupleBrowser<K, V> browser = tree.browse();
291            browser.getNext( tuple );
292
293            return ( V ) tuple.getKey();
294        }
295        catch ( IOException ioe )
296        {
297            throw new LdapOtherException( ioe.getMessage() );
298        }
299
300    }
301
302
303    /**
304     * {@inheritDoc}
305     */
306    @Override
307    public boolean hasGreaterOrEqual( PartitionTxn transaction, K key, V val ) throws LdapException
308    {
309        if ( key == null )
310        {
311            return false;
312        }
313
314        if ( !allowsDuplicates )
315        {
316            throw new UnsupportedOperationException( I18n.err( I18n.ERR_593 ) );
317        }
318
319        try
320        {
321            DupsContainer<V> values = getDupsContainer( ( byte[] ) bt.find( key ) );
322
323            if ( values.isArrayTree() )
324            {
325                ArrayTree<V> set = values.getArrayTree();
326                V result = set.findGreaterOrEqual( val );
327                return result != null;
328            }
329
330            // last option is to try a btree with BTreeRedirects
331            BTree<K, V> tree = getBTree( values.getBTreeRedirect() );
332
333            return tree.size() != 0 && btreeHas( tree, val, true );
334        }
335        catch ( IOException ioe )
336        {
337            throw new LdapOtherException( ioe.getMessage() );
338        }
339    }
340
341
342    /**
343     * {@inheritDoc}
344     */
345    @Override
346    public boolean hasLessOrEqual( PartitionTxn transaction, K key, V val ) throws LdapException
347    {
348        if ( key == null )
349        {
350            return false;
351        }
352
353        if ( !allowsDuplicates )
354        {
355            throw new UnsupportedOperationException( I18n.err( I18n.ERR_593 ) );
356        }
357
358        try
359        {
360            DupsContainer<V> values = getDupsContainer( ( byte[] ) bt.find( key ) );
361
362            if ( values.isArrayTree() )
363            {
364                ArrayTree<V> set = values.getArrayTree();
365                V result = set.findLessOrEqual( val );
366                return result != null;
367            }
368
369            // last option is to try a btree with BTreeRedirects
370            BTree<K, V> tree = getBTree( values.getBTreeRedirect() );
371
372            return tree.size() != 0 && btreeHas( tree, val, false );
373        }
374        catch ( IOException ioe )
375        {
376            throw new LdapOtherException( ioe.getMessage() );
377        }
378    }
379
380
381    /**
382     * {@inheritDoc}
383     */
384    @Override
385    @SuppressWarnings("unchecked")
386    public boolean hasGreaterOrEqual( PartitionTxn transaction, K key ) throws LdapException
387    {
388        try
389        {
390            // See if we can find the border between keys greater than and less
391            // than in the set of keys.  This will be the spot we search from.
392            jdbm.helper.Tuple tuple = bt.findGreaterOrEqual( key );
393    
394            // Test for equality first since it satisfies both greater/less than
395            if ( null != tuple && keyComparator.compare( ( K ) tuple.getKey(), key ) == 0 )
396            {
397                return true;
398            }
399    
400            // Greater searches are easy and quick thanks to findGreaterOrEqual
401            // A null return above means there were no equal or greater keys
402            return ( null != tuple );
403        }
404        catch ( IOException ioe )
405        {
406            throw new LdapOtherException( ioe.getMessage() );
407        }
408    }
409
410
411    /**
412     * {@inheritDoc}
413     */
414    @SuppressWarnings("unchecked")
415    @Override
416    public boolean hasLessOrEqual( PartitionTxn transaction, K key ) throws LdapException
417    {
418        try
419        {
420            // Can only find greater than or equal to with JDBM so we find that
421            // and work backwards to see if we can find one less than the key
422            Tuple<K, V> tuple = bt.findGreaterOrEqual( key );
423    
424            // Test for equality first since it satisfies equal to condition
425            if ( null != tuple && keyComparator.compare( tuple.getKey(), key ) == 0 )
426            {
427                return true;
428            }
429    
430            if ( null == tuple )
431            {
432                /*
433                 * Jdbm failed to find a key greater than or equal to the argument
434                 * which means all the keys in the table are less than the
435                 * supplied key argument.  We can hence return true if the table
436                 * contains any Tuples.
437                 */
438                return count > 0;
439            }
440            else
441            {
442                /*
443                 * We have the next tuple whose key is the next greater than the
444                 * key argument supplied.  We use this key to advance a browser to
445                 * that tuple and scan down to lesser Tuples until we hit one
446                 * that is less than the key argument supplied.  Usually this will
447                 * be the previous tuple if it exists.
448                 */
449                TupleBrowser browser = bt.browse( tuple.getKey() );
450    
451                if ( browser.getPrevious( tuple ) )
452                {
453                    return true;
454                }
455            }
456    
457            return false;
458        }
459        catch ( IOException ioe )
460        {
461            throw new LdapOtherException( ioe.getMessage() );
462        }
463    }
464
465
466    /**
467     * {@inheritDoc}
468     */
469    @SuppressWarnings("unchecked")
470    public boolean has( PartitionTxn transaction, K key, V value ) throws LdapException
471    {
472        if ( key == null )
473        {
474            return false;
475        }
476
477        try
478        {
479            if ( !allowsDuplicates )
480            {
481                V stored = bt.find( key );
482                return null != stored && stored.equals( value );
483            }
484
485            DupsContainer<V> values = getDupsContainer( ( byte[] ) bt.find( key ) );
486
487            if ( values.isArrayTree() )
488            {
489                return values.getArrayTree().find( value ) != null;
490            }
491
492            return getBTree( values.getBTreeRedirect() ).find( value ) != null;
493        }
494        catch ( IOException ioe )
495        {
496            throw new LdapOtherException( ioe.getMessage() );
497        }
498    }
499
500
501    /**
502     * {@inheritDoc}
503     */
504    @Override
505    public boolean has( PartitionTxn transaction, K key ) throws LdapException
506    {
507        try
508        {
509            return key != null && bt.find( key ) != null;
510        }
511        catch ( IOException ioe )
512        {
513            throw new LdapException( ioe );
514        }
515    }
516
517
518    /**
519     * {@inheritDoc}
520     */
521    @Override
522    @SuppressWarnings("unchecked")
523    public synchronized void put( PartitionTxn transaction, K key, V value ) throws LdapException
524    {
525        try
526        {
527            if ( LOG.isDebugEnabled() )
528            {
529                LOG.debug( "---> Add {} = {}", name, key );
530            }
531
532            if ( ( value == null ) || ( key == null ) )
533            {
534                throw new IllegalArgumentException( I18n.err( I18n.ERR_594 ) );
535            }
536
537            V replaced;
538
539            if ( !allowsDuplicates )
540            {
541                replaced = ( V ) bt.insert( key, value, true );
542
543                if ( null == replaced )
544                {
545                    count++;
546                }
547
548                if ( LOG.isDebugEnabled() )
549                {
550                    LOG.debug( "<--- Add ONE {} = {}", name, key );
551                }
552
553                return;
554            }
555
556            DupsContainer<V> values = getDupsContainer( ( byte[] ) bt.find( key ) );
557
558            if ( values.isArrayTree() )
559            {
560                ArrayTree<V> set = values.getArrayTree();
561                replaced = set.insert( value );
562
563                if ( replaced != null )// if the value already present returns the same value
564                {
565                    return;
566                }
567
568                if ( set.size() > numDupLimit )
569                {
570                    BTree tree = convertToBTree( set );
571                    BTreeRedirect redirect = new BTreeRedirect( tree.getRecordId() );
572                    bt.insert( key, ( V ) BTreeRedirectMarshaller.INSTANCE.serialize( redirect ), true );
573
574                    if ( LOG.isDebugEnabled() )
575                    {
576                        LOG.debug( "<--- Add new BTREE {} = {}", name, key );
577                    }
578                }
579                else
580                {
581                    bt.insert( key, ( V ) marshaller.serialize( set ), true );
582
583                    if ( LOG.isDebugEnabled() )
584                    {
585                        LOG.debug( "<--- Add AVL {} = {}", name, key );
586                    }
587                }
588
589                count++;
590
591                return;
592            }
593
594            BTree tree = getBTree( values.getBTreeRedirect() );
595            replaced = ( V ) tree.insert( value, Strings.EMPTY_BYTES, true );
596
597            if ( replaced == null )
598            {
599                count++;
600            }
601
602            if ( LOG.isDebugEnabled() )
603            {
604                LOG.debug( "<--- Add BTREE {} = {}", name, key );
605            }
606        }
607        catch ( IOException | CursorException | LdapException e )
608        {
609            LOG.error( I18n.err( I18n.ERR_131, key, name ), e );
610            throw new LdapOtherException( e.getMessage(), e );
611        }
612    }
613
614
615    /**
616     * {@inheritDoc}
617     */
618    @SuppressWarnings("unchecked")
619    @Override
620    public synchronized void remove( PartitionTxn transaction, K key, V value ) throws LdapException
621    {
622        try
623        {
624            if ( LOG.isDebugEnabled() )
625            {
626                LOG.debug( "---> Remove {} = {}, {}", name, key, value );
627            }
628
629            if ( key == null )
630            {
631                if ( LOG.isDebugEnabled() )
632                {
633                    LOG.debug( "<--- Remove NULL key {}", name );
634                }
635
636                return;
637            }
638
639            if ( !allowsDuplicates )
640            {
641                V oldValue = bt.find( key );
642
643                // Remove the value only if it is the same as value.
644                if ( ( oldValue != null ) && oldValue.equals( value ) )
645                {
646                    bt.remove( key );
647                    count--;
648
649                    if ( LOG.isDebugEnabled() )
650                    {
651                        LOG.debug( "<--- Remove ONE {} = {}, {}", name, key, value );
652                    }
653                }
654
655                return;
656            }
657
658            DupsContainer<V> values = getDupsContainer( ( byte[] ) bt.find( key ) );
659
660            if ( values.isArrayTree() )
661            {
662                ArrayTree<V> set = values.getArrayTree();
663
664                // If removal succeeds then remove if set is empty else replace it
665                if ( set.remove( value ) != null )
666                {
667                    if ( set.isEmpty() )
668                    {
669                        bt.remove( key );
670                    }
671                    else
672                    {
673                        bt.insert( key, ( V ) marshaller.serialize( set ), true );
674                    }
675
676                    count--;
677
678                    if ( LOG.isDebugEnabled() )
679                    {
680                        LOG.debug( "<--- Remove AVL {} = {}, {}", name, key, value );
681                    }
682                }
683
684                return;
685            }
686
687            // if the number of duplicates falls below the numDupLimit value
688            BTree tree = getBTree( values.getBTreeRedirect() );
689
690            if ( tree.find( value ) != null && tree.remove( value ) != null )
691            {
692                /*
693                 * If we drop below the duplicate limit then we revert from using
694                 * a Jdbm BTree to using an in memory AvlTree.
695                 */
696                if ( tree.size() <= numDupLimit )
697                {
698                    ArrayTree<V> avlTree = convertToArrayTree( tree );
699                    bt.insert( key, ( V ) marshaller.serialize( avlTree ), true );
700                    recMan.delete( tree.getRecordId() );
701                }
702
703                count--;
704
705                if ( LOG.isDebugEnabled() )
706                {
707                    LOG.debug( "<--- Remove BTREE {} = {}, {}", name, key, value );
708                }
709            }
710        }
711        catch ( Exception e )
712        {
713            LOG.error( I18n.err( I18n.ERR_132, key, value, name ), e );
714        }
715    }
716
717
718    /**
719     * {@inheritDoc}
720     */
721    @Override
722    public synchronized void remove( PartitionTxn transaction, K key ) throws LdapException
723    {
724        try
725        {
726            if ( LOG.isDebugEnabled() )
727            {
728                LOG.debug( "---> Remove {} = {}", name, key );
729            }
730
731            if ( key == null )
732            {
733                return;
734            }
735
736            Object returned = bt.remove( key );
737
738            if ( null == returned )
739            {
740                if ( LOG.isDebugEnabled() )
741                {
742                    LOG.debug( "<--- Remove AVL {} = {} (not found)", name, key );
743                }
744
745                return;
746            }
747
748            if ( !allowsDuplicates )
749            {
750                this.count--;
751
752                if ( LOG.isDebugEnabled() )
753                {
754                    LOG.debug( "<--- Remove ONE {} = {}", name, key );
755                }
756
757                return;
758            }
759
760            byte[] serialized = ( byte[] ) returned;
761
762            if ( BTreeRedirectMarshaller.isRedirect( serialized ) )
763            {
764                BTree tree = getBTree( BTreeRedirectMarshaller.INSTANCE.deserialize( serialized ) );
765                this.count -= tree.size();
766
767                if ( LOG.isDebugEnabled() )
768                {
769                    LOG.debug( "<--- Remove BTree {} = {}", name, key );
770                }
771
772                recMan.delete( tree.getRecordId() );
773                duplicateBtrees.remove( tree.getRecordId() );
774            }
775            else
776            {
777                ArrayTree<V> set = marshaller.deserialize( serialized );
778                this.count -= set.size();
779
780                if ( LOG.isDebugEnabled() )
781                {
782                    LOG.debug( "<--- Remove AVL {} = {}", name, key );
783                }
784            }
785        }
786        catch ( Exception e )
787        {
788            LOG.error( I18n.err( I18n.ERR_133, key, name ), e );
789        }
790    }
791
792
793    /**
794     * {@inheritDoc}
795     */
796    @Override
797    public Cursor<org.apache.directory.api.ldap.model.cursor.Tuple<K, V>> cursor()
798    {
799        if ( allowsDuplicates )
800        {
801            return new DupsCursor<>( this );
802        }
803
804        return new NoDupsCursor<>( this );
805    }
806
807
808    /**
809     * {@inheritDoc}
810     */
811    @Override
812    public Cursor<org.apache.directory.api.ldap.model.cursor.Tuple<K, V>> cursor( PartitionTxn partitionTxn, K key ) throws LdapException
813    {
814        if ( key == null )
815        {
816            return new EmptyCursor<>();
817        }
818
819        try
820        { 
821            V raw = bt.find( key );
822    
823            if ( null == raw )
824            {
825                return new EmptyCursor<>();
826            }
827    
828            if ( !allowsDuplicates )
829            {
830                return new SingletonCursor<>(
831                    new org.apache.directory.api.ldap.model.cursor.Tuple<K, V>( key, raw ) );
832            }
833    
834            byte[] serialized = ( byte[] ) raw;
835    
836            if ( BTreeRedirectMarshaller.isRedirect( serialized ) )
837            {
838                BTree tree = getBTree( BTreeRedirectMarshaller.INSTANCE.deserialize( serialized ) );
839                return new KeyTupleBTreeCursor<>( tree, key, valueComparator );
840            }
841    
842            ArrayTree<V> set = marshaller.deserialize( serialized );
843    
844            return new KeyTupleArrayCursor<>( set, key );
845        }
846        catch ( IOException ioe )
847        {
848            throw new LdapOtherException( ioe.getMessage(), ioe );
849        }
850    }
851
852
853    /**
854     * {@inheritDoc}
855     */
856    @SuppressWarnings("unchecked")
857    @Override
858    public Cursor<V> valueCursor( PartitionTxn transaction, K key ) throws LdapException
859    {
860        if ( key == null )
861        {
862            return new EmptyCursor<>();
863        }
864
865        try
866        {
867            V raw = bt.find( key );
868    
869            if ( null == raw )
870            {
871                return new EmptyCursor<>();
872            }
873    
874            if ( !allowsDuplicates )
875            {
876                return new SingletonCursor<>( raw );
877            }
878    
879            byte[] serialized = ( byte[] ) raw;
880    
881            if ( BTreeRedirectMarshaller.isRedirect( serialized ) )
882            {
883                BTree tree = getBTree( BTreeRedirectMarshaller.INSTANCE.deserialize( serialized ) );
884                return new KeyBTreeCursor<>( tree, valueComparator );
885            }
886    
887            return new ArrayTreeCursor<>( marshaller.deserialize( serialized ) );
888        }
889        catch ( IOException ioe )
890        {
891            throw new LdapOtherException( ioe.getMessage(), ioe );
892        }
893    }
894
895
896    // ------------------------------------------------------------------------
897    // Maintenance Operations 
898    // ------------------------------------------------------------------------
899    /**
900     * {@inheritDoc}
901     */
902    @Override
903    public synchronized void close( PartitionTxn transaction ) throws LdapException
904    {
905        // Nothing to do
906    }
907
908
909    public Marshaller<ArrayTree<V>> getMarshaller()
910    {
911        return marshaller;
912    }
913
914
915    // ------------------------------------------------------------------------
916    // Private/Package Utility Methods 
917    // ------------------------------------------------------------------------
918
919    /**
920     * Added to check that we actually switch from one data structure to the 
921     * B+Tree structure on disk for duplicates that go beyond the threshold.
922     */
923    boolean isKeyUsingBTree( K key ) throws Exception
924    {
925        if ( key == null )
926        {
927            throw new IllegalArgumentException( "key is null" );
928        }
929
930        if ( !allowsDuplicates )
931        {
932            return false;
933        }
934
935        DupsContainer<V> values = getDupsContainer( ( byte[] ) bt.find( key ) );
936
937        return values.isBTreeRedirect();
938    }
939
940
941    DupsContainer<V> getDupsContainer( byte[] serialized ) throws LdapException
942    {
943        if ( serialized == null )
944        {
945            return new DupsContainer<>( new ArrayTree<V>( valueComparator ) );
946        }
947
948        if ( BTreeRedirectMarshaller.isRedirect( serialized ) )
949        {
950            try
951            {
952                return new DupsContainer<>( BTreeRedirectMarshaller.INSTANCE.deserialize( serialized ) );
953            }
954            catch ( IOException ioe )
955            {
956                throw new LdapOtherException( ioe.getMessage() );
957            }
958
959        }
960
961        try
962        {
963            return new DupsContainer<>( marshaller.deserialize( serialized ) );
964        }
965        catch ( IOException ioe )
966        {
967            throw new LdapOtherException( ioe.getMessage() );
968        }
969    }
970
971
972    /**
973     * Returns the main BTree used by this table.
974     *
975     * @return the main JDBM BTree used by this table
976     */
977    BTree getBTree()
978    {
979        return bt;
980    }
981
982
983    BTree getBTree( BTreeRedirect redirect ) throws IOException
984    {
985        if ( duplicateBtrees.containsKey( redirect.getRecId() ) )
986        {
987            return duplicateBtrees.get( redirect.getRecId() );
988        }
989
990        BTree<K, V> tree = new BTree<K, V>().load( recMan, redirect.getRecId() );
991        ( ( SerializableComparator<K> ) tree.getComparator() ).setSchemaManager( schemaManager );
992        duplicateBtrees.put( redirect.getRecId(), tree );
993
994        return tree;
995    }
996
997
998    @SuppressWarnings("unchecked")
999    private boolean btreeHas( BTree tree, V key, boolean isGreaterThan ) throws IOException
1000    {
1001        jdbm.helper.Tuple tuple = new jdbm.helper.Tuple();
1002
1003        TupleBrowser browser = tree.browse( key );
1004
1005        if ( isGreaterThan )
1006        {
1007            return browser.getNext( tuple );
1008        }
1009        else
1010        {
1011            if ( browser.getPrevious( tuple ) )
1012            {
1013                return true;
1014            }
1015            else
1016            {
1017                /*
1018                 * getPrevious() above fails which means the browser has is
1019                 * before the first Tuple of the btree.  A call to getNext()
1020                 * should work every time.
1021                 */
1022                browser.getNext( tuple );
1023
1024                /*
1025                 * Since the browser is positioned now on the Tuple with the
1026                 * smallest key we just need to check if it equals this key
1027                 * which is the only chance for returning true.
1028                 */
1029                V firstKey = ( V ) tuple.getKey();
1030
1031                return valueComparator.compare( key, firstKey ) == 0;
1032            }
1033        }
1034    }
1035
1036
1037    @SuppressWarnings("unchecked")
1038    private ArrayTree<V> convertToArrayTree( BTree bTree ) throws IOException
1039    {
1040        ArrayTree<V> avlTree = new ArrayTree<>( valueComparator );
1041        TupleBrowser browser = bTree.browse();
1042        jdbm.helper.Tuple tuple = new jdbm.helper.Tuple();
1043
1044        while ( browser.getNext( tuple ) )
1045        {
1046            avlTree.insert( ( V ) tuple.getKey() );
1047        }
1048
1049        return avlTree;
1050    }
1051
1052
1053    private BTree<V, K> convertToBTree( ArrayTree<V> arrayTree ) throws IOException, CursorException, LdapException
1054    {
1055        BTree<V, K> bTree;
1056
1057        if ( valueSerializer != null )
1058        {
1059            bTree = new BTree<>( recMan, valueComparator, valueSerializer, null );
1060        }
1061        else
1062        {
1063            bTree = new BTree<>( recMan, valueComparator );
1064        }
1065
1066        try ( Cursor<V> keys = new ArrayTreeCursor<>( arrayTree ) )
1067        {
1068            keys.beforeFirst();
1069    
1070            while ( keys.next() )
1071            {
1072                bTree.insert( keys.get(), ( K ) Strings.EMPTY_BYTES, true );
1073            }
1074        }
1075
1076        return bTree;
1077    }
1078}