001/*
002 *   Licensed to the Apache Software Foundation (ASF) under one
003 *   or more contributor license agreements.  See the NOTICE file
004 *   distributed with this work for additional information
005 *   regarding copyright ownership.  The ASF licenses this file
006 *   to you under the Apache License, Version 2.0 (the
007 *   "License"); you may not use this file except in compliance
008 *   with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 *   Unless required by applicable law or agreed to in writing,
013 *   software distributed under the License is distributed on an
014 *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 *   KIND, either express or implied.  See the License for the
016 *   specific language governing permissions and limitations
017 *   under the License.
018 *
019 */
020package org.apache.directory.mavibot.btree.persisted;
021
022
023import java.io.DataInputStream;
024import java.io.DataOutputStream;
025import java.io.File;
026import java.io.FileInputStream;
027import java.io.FileOutputStream;
028import java.io.IOException;
029import java.lang.reflect.Array;
030import java.util.Arrays;
031import java.util.Collections;
032import java.util.Comparator;
033import java.util.Iterator;
034import java.util.NoSuchElementException;
035import java.util.UUID;
036
037import org.apache.directory.mavibot.btree.Tuple;
038import org.apache.directory.mavibot.btree.util.TupleReaderWriter;
039
040
041/**
042 * A utility class for sorting a large number of keys before building a BTree using {@link PersistedBTreeBuilder}.
043 *
044 * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
045 */
046public class BulkDataSorter<K, V>
047{
048    private File workDir;
049
050    private int splitAfter = 1000;
051
052    private Comparator<Tuple<K, V>> tupleComparator;
053
054    private TupleReaderWriter<K, V> readerWriter;
055
056    private boolean sorted;
057
058
059    public BulkDataSorter( TupleReaderWriter<K, V> readerWriter, Comparator<Tuple<K, V>> tupleComparator,
060        int splitAfter )
061    {
062        if ( splitAfter <= 0 )
063        {
064            throw new IllegalArgumentException( "Value of splitAfter parameter cannot be null" );
065        }
066
067        this.splitAfter = splitAfter;
068
069        this.workDir = new File( System.getProperty( "java.io.tmpdir" ), System.currentTimeMillis() + "-sort" );
070        workDir.mkdir();
071
072        this.readerWriter = readerWriter;
073        this.tupleComparator = tupleComparator;
074    }
075
076
077    public void sort( File dataFile ) throws IOException
078    {
079        int i = 0;
080
081        Tuple<K, V>[] arr = ( Tuple<K, V>[] ) Array.newInstance( Tuple.class, splitAfter );
082
083        Tuple<K, V> t = null;
084
085        DataInputStream in = new DataInputStream( new FileInputStream( dataFile ) );
086
087        while ( ( t = readerWriter.readUnsortedTuple( in ) ) != null )
088        {
089            arr[i++] = t;
090
091            if ( ( i % splitAfter ) == 0 )
092            {
093                i = 0;
094                Arrays.sort( arr, tupleComparator );
095
096                storeSortedData( arr );
097            }
098        }
099
100        if ( i != 0 )
101        {
102            Tuple<K, V>[] tmp = ( Tuple<K, V>[] ) Array.newInstance( Tuple.class, i );
103            System.arraycopy( arr, 0, tmp, 0, i );
104            Arrays.sort( tmp, tupleComparator );
105
106            storeSortedData( tmp );
107        }
108
109        sorted = true;
110    }
111
112
113    private void storeSortedData( Tuple<K, V>[] arr ) throws IOException
114    {
115        File tempFile = File.createTempFile( UUID.randomUUID().toString(), ".batch", workDir );
116        DataOutputStream out = new DataOutputStream( new FileOutputStream( tempFile ) );
117
118        for ( Tuple<K, V> t : arr )
119        {
120            readerWriter.storeSortedTuple( t, out );
121        }
122
123        out.flush();
124        out.close();
125    }
126
127
128    public File getWorkDir()
129    {
130        return workDir;
131    }
132
133
134    public Iterator<Tuple<K, V>> getMergeSortedTuples() throws IOException
135    {
136        if ( !sorted )
137        {
138            throw new IllegalStateException( "Data is not sorted" );
139        }
140
141        File[] batches = workDir.listFiles();
142
143        if ( batches.length == 0 )
144        {
145            return Collections.EMPTY_LIST.iterator();
146        }
147
148        final DataInputStream[] streams = new DataInputStream[batches.length];
149
150        for ( int i = 0; i < batches.length; i++ )
151        {
152            streams[i] = new DataInputStream( new FileInputStream( batches[i] ) );
153        }
154
155        Iterator<Tuple<K, V>> itr = new Iterator<Tuple<K, V>>()
156        {
157            private Tuple<K, V>[] heads = ( Tuple<K, V>[] ) Array.newInstance( Tuple.class, streams.length );
158
159            private Tuple<K, V> candidate = null;
160
161            private boolean closed;
162
163            private int candidatePos = -1;
164
165
166            @Override
167            public boolean hasNext()
168            {
169
170                if ( closed )
171                {
172                    throw new IllegalStateException( "No elements to read" );
173                }
174
175                Tuple<K, V> available = null;
176
177                for ( int i = 0; i < streams.length; i++ )
178                {
179                    if ( heads[i] == null )
180                    {
181                        heads[i] = readerWriter.readSortedTuple( streams[i] );
182                    }
183
184                    if ( available == null )
185                    {
186                        available = heads[i];
187                        candidatePos = i;
188                    }
189                    else
190                    {
191                        if ( ( available != null ) && ( heads[i] != null ) )
192                        {
193                            int comp = tupleComparator.compare( heads[i], available );
194                            if ( comp <= 0 )
195                            {
196                                available = heads[i];
197                                candidatePos = i;
198                            }
199                        }
200                    }
201                }
202
203                heads[candidatePos] = null;
204
205                if ( available == null )
206                {
207                    for ( int i = 0; i < streams.length; i++ )
208                    {
209                        if ( heads[i] != null )
210                        {
211                            available = heads[i];
212                            heads[i] = readerWriter.readUnsortedTuple( streams[i] );
213                            break;
214                        }
215                    }
216                }
217
218                if ( available != null )
219                {
220                    candidate = available;
221                    return true;
222                }
223
224                // finally close the streams
225                for ( DataInputStream in : streams )
226                {
227                    try
228                    {
229                        in.close();
230                    }
231                    catch ( Exception e )
232                    {
233                        e.printStackTrace();
234                    }
235                }
236
237                closed = true;
238
239                return false;
240            }
241
242
243            @Override
244            public Tuple<K, V> next()
245            {
246                if ( candidate == null )
247                {
248                    if ( !closed )
249                    {
250                        hasNext();
251                    }
252                }
253
254                if ( candidate == null )
255                {
256                    throw new NoSuchElementException( "No tuples found" );
257                }
258
259                return candidate;
260            }
261
262
263            @Override
264            public void remove()
265            {
266                throw new UnsupportedOperationException( "Not supported" );
267            }
268
269        };
270
271        return itr;
272    }
273}