View Javadoc
1   /*
2    *   Licensed to the Apache Software Foundation (ASF) under one
3    *   or more contributor license agreements.  See the NOTICE file
4    *   distributed with this work for additional information
5    *   regarding copyright ownership.  The ASF licenses this file
6    *   to you under the Apache License, Version 2.0 (the
7    *   "License"); you may not use this file except in compliance
8    *   with the License.  You may obtain a copy of the License at
9    *
10   *     http://www.apache.org/licenses/LICENSE-2.0
11   *
12   *   Unless required by applicable law or agreed to in writing,
13   *   software distributed under the License is distributed on an
14   *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15   *   KIND, either express or implied.  See the License for the
16   *   specific language governing permissions and limitations
17   *   under the License.
18   *
19   */
20  package org.apache.directory.mavibot.btree.persisted;
21  
22  
23  import java.io.DataInputStream;
24  import java.io.DataOutputStream;
25  import java.io.File;
26  import java.io.FileInputStream;
27  import java.io.FileOutputStream;
28  import java.io.IOException;
29  import java.lang.reflect.Array;
30  import java.util.Arrays;
31  import java.util.Collections;
32  import java.util.Comparator;
33  import java.util.Iterator;
34  import java.util.NoSuchElementException;
35  import java.util.UUID;
36  
37  import org.apache.directory.mavibot.btree.Tuple;
38  import org.apache.directory.mavibot.btree.util.TupleReaderWriter;
39  
40  
41  /**
42   * A utility class for sorting a large number of keys before building a BTree using {@link PersistedBTreeBuilder}.
43   *
44   * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
45   */
46  public class BulkDataSorter<K, V>
47  {
48      private File workDir;
49  
50      private int splitAfter = 1000;
51  
52      private Comparator<Tuple<K, V>> tupleComparator;
53  
54      private TupleReaderWriter<K, V> readerWriter;
55  
56      private boolean sorted;
57  
58  
59      public BulkDataSorter( TupleReaderWriter<K, V> readerWriter, Comparator<Tuple<K, V>> tupleComparator,
60          int splitAfter )
61      {
62          if ( splitAfter <= 0 )
63          {
64              throw new IllegalArgumentException( "Value of splitAfter parameter cannot be null" );
65          }
66  
67          this.splitAfter = splitAfter;
68  
69          this.workDir = new File( System.getProperty( "java.io.tmpdir" ), System.currentTimeMillis() + "-sort" );
70          workDir.mkdir();
71  
72          this.readerWriter = readerWriter;
73          this.tupleComparator = tupleComparator;
74      }
75  
76  
77      public void sort( File dataFile ) throws IOException
78      {
79          int i = 0;
80  
81          Tuple<K, V>[] arr = ( Tuple<K, V>[] ) Array.newInstance( Tuple.class, splitAfter );
82  
83          Tuple<K, V> t = null;
84  
85          DataInputStream in = new DataInputStream( new FileInputStream( dataFile ) );
86  
87          while ( ( t = readerWriter.readUnsortedTuple( in ) ) != null )
88          {
89              arr[i++] = t;
90  
91              if ( ( i % splitAfter ) == 0 )
92              {
93                  i = 0;
94                  Arrays.sort( arr, tupleComparator );
95  
96                  storeSortedData( arr );
97              }
98          }
99  
100         if ( i != 0 )
101         {
102             Tuple<K, V>[] tmp = ( Tuple<K, V>[] ) Array.newInstance( Tuple.class, i );
103             System.arraycopy( arr, 0, tmp, 0, i );
104             Arrays.sort( tmp, tupleComparator );
105 
106             storeSortedData( tmp );
107         }
108 
109         sorted = true;
110     }
111 
112 
113     private void storeSortedData( Tuple<K, V>[] arr ) throws IOException
114     {
115         File tempFile = File.createTempFile( UUID.randomUUID().toString(), ".batch", workDir );
116         DataOutputStream out = new DataOutputStream( new FileOutputStream( tempFile ) );
117 
118         for ( Tuple<K, V> t : arr )
119         {
120             readerWriter.storeSortedTuple( t, out );
121         }
122 
123         out.flush();
124         out.close();
125     }
126 
127 
128     public File getWorkDir()
129     {
130         return workDir;
131     }
132 
133 
134     public Iterator<Tuple<K, V>> getMergeSortedTuples() throws IOException
135     {
136         if ( !sorted )
137         {
138             throw new IllegalStateException( "Data is not sorted" );
139         }
140 
141         File[] batches = workDir.listFiles();
142 
143         if ( batches.length == 0 )
144         {
145             return Collections.EMPTY_LIST.iterator();
146         }
147 
148         final DataInputStream[] streams = new DataInputStream[batches.length];
149 
150         for ( int i = 0; i < batches.length; i++ )
151         {
152             streams[i] = new DataInputStream( new FileInputStream( batches[i] ) );
153         }
154 
155         Iterator<Tuple<K, V>> itr = new Iterator<Tuple<K, V>>()
156         {
157             private Tuple<K, V>[] heads = ( Tuple<K, V>[] ) Array.newInstance( Tuple.class, streams.length );
158 
159             private Tuple<K, V> candidate = null;
160 
161             private boolean closed;
162 
163             private int candidatePos = -1;
164 
165 
166             @Override
167             public boolean hasNext()
168             {
169 
170                 if ( closed )
171                 {
172                     throw new IllegalStateException( "No elements to read" );
173                 }
174 
175                 Tuple<K, V> available = null;
176 
177                 for ( int i = 0; i < streams.length; i++ )
178                 {
179                     if ( heads[i] == null )
180                     {
181                         heads[i] = readerWriter.readSortedTuple( streams[i] );
182                     }
183 
184                     if ( available == null )
185                     {
186                         available = heads[i];
187                         candidatePos = i;
188                     }
189                     else
190                     {
191                         if ( ( available != null ) && ( heads[i] != null ) )
192                         {
193                             int comp = tupleComparator.compare( heads[i], available );
194                             if ( comp <= 0 )
195                             {
196                                 available = heads[i];
197                                 candidatePos = i;
198                             }
199                         }
200                     }
201                 }
202 
203                 heads[candidatePos] = null;
204 
205                 if ( available == null )
206                 {
207                     for ( int i = 0; i < streams.length; i++ )
208                     {
209                         if ( heads[i] != null )
210                         {
211                             available = heads[i];
212                             heads[i] = readerWriter.readUnsortedTuple( streams[i] );
213                             break;
214                         }
215                     }
216                 }
217 
218                 if ( available != null )
219                 {
220                     candidate = available;
221                     return true;
222                 }
223 
224                 // finally close the streams
225                 for ( DataInputStream in : streams )
226                 {
227                     try
228                     {
229                         in.close();
230                     }
231                     catch ( Exception e )
232                     {
233                         e.printStackTrace();
234                     }
235                 }
236 
237                 closed = true;
238 
239                 return false;
240             }
241 
242 
243             @Override
244             public Tuple<K, V> next()
245             {
246                 if ( candidate == null )
247                 {
248                     if ( !closed )
249                     {
250                         hasNext();
251                     }
252                 }
253 
254                 if ( candidate == null )
255                 {
256                     throw new NoSuchElementException( "No tuples found" );
257                 }
258 
259                 return candidate;
260             }
261 
262 
263             @Override
264             public void remove()
265             {
266                 throw new UnsupportedOperationException( "Not supported" );
267             }
268 
269         };
270 
271         return itr;
272     }
273 }