001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 * 019 */ 020package org.apache.directory.mavibot.btree.persisted; 021 022 023import java.io.DataInputStream; 024import java.io.DataOutputStream; 025import java.io.File; 026import java.io.FileInputStream; 027import java.io.FileOutputStream; 028import java.io.IOException; 029import java.lang.reflect.Array; 030import java.util.Arrays; 031import java.util.Collections; 032import java.util.Comparator; 033import java.util.Iterator; 034import java.util.NoSuchElementException; 035import java.util.UUID; 036 037import org.apache.directory.mavibot.btree.Tuple; 038import org.apache.directory.mavibot.btree.util.TupleReaderWriter; 039 040 041/** 042 * A utility class for sorting a large number of keys before building a BTree using {@link PersistedBTreeBuilder}. 043 * 044 * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a> 045 */ 046public class BulkDataSorter<K, V> 047{ 048 private File workDir; 049 050 private int splitAfter = 1000; 051 052 private Comparator<Tuple<K, V>> tupleComparator; 053 054 private TupleReaderWriter<K, V> readerWriter; 055 056 private boolean sorted; 057 058 059 public BulkDataSorter( TupleReaderWriter<K, V> readerWriter, Comparator<Tuple<K, V>> tupleComparator, 060 int splitAfter ) 061 { 062 if ( splitAfter <= 0 ) 063 { 064 throw new IllegalArgumentException( "Value of splitAfter parameter cannot be null" ); 065 } 066 067 this.splitAfter = splitAfter; 068 069 this.workDir = new File( System.getProperty( "java.io.tmpdir" ), System.currentTimeMillis() + "-sort" ); 070 workDir.mkdir(); 071 072 this.readerWriter = readerWriter; 073 this.tupleComparator = tupleComparator; 074 } 075 076 077 public void sort( File dataFile ) throws IOException 078 { 079 int i = 0; 080 081 Tuple<K, V>[] arr = ( Tuple<K, V>[] ) Array.newInstance( Tuple.class, splitAfter ); 082 083 Tuple<K, V> t = null; 084 085 DataInputStream in = new DataInputStream( new FileInputStream( dataFile ) ); 086 087 while ( ( t = readerWriter.readUnsortedTuple( in ) ) != null ) 088 { 089 arr[i++] = t; 090 091 if ( ( i % splitAfter ) == 0 ) 092 { 093 i = 0; 094 Arrays.sort( arr, tupleComparator ); 095 096 storeSortedData( arr ); 097 } 098 } 099 100 if ( i != 0 ) 101 { 102 Tuple<K, V>[] tmp = ( Tuple<K, V>[] ) Array.newInstance( Tuple.class, i ); 103 System.arraycopy( arr, 0, tmp, 0, i ); 104 Arrays.sort( tmp, tupleComparator ); 105 106 storeSortedData( tmp ); 107 } 108 109 sorted = true; 110 } 111 112 113 private void storeSortedData( Tuple<K, V>[] arr ) throws IOException 114 { 115 File tempFile = File.createTempFile( UUID.randomUUID().toString(), ".batch", workDir ); 116 DataOutputStream out = new DataOutputStream( new FileOutputStream( tempFile ) ); 117 118 for ( Tuple<K, V> t : arr ) 119 { 120 readerWriter.storeSortedTuple( t, out ); 121 } 122 123 out.flush(); 124 out.close(); 125 } 126 127 128 public File getWorkDir() 129 { 130 return workDir; 131 } 132 133 134 public Iterator<Tuple<K, V>> getMergeSortedTuples() throws IOException 135 { 136 if ( !sorted ) 137 { 138 throw new IllegalStateException( "Data is not sorted" ); 139 } 140 141 File[] batches = workDir.listFiles(); 142 143 if ( batches.length == 0 ) 144 { 145 return Collections.EMPTY_LIST.iterator(); 146 } 147 148 final DataInputStream[] streams = new DataInputStream[batches.length]; 149 150 for ( int i = 0; i < batches.length; i++ ) 151 { 152 streams[i] = new DataInputStream( new FileInputStream( batches[i] ) ); 153 } 154 155 Iterator<Tuple<K, V>> itr = new Iterator<Tuple<K, V>>() 156 { 157 private Tuple<K, V>[] heads = ( Tuple<K, V>[] ) Array.newInstance( Tuple.class, streams.length ); 158 159 private Tuple<K, V> candidate = null; 160 161 private boolean closed; 162 163 private int candidatePos = -1; 164 165 166 @Override 167 public boolean hasNext() 168 { 169 170 if ( closed ) 171 { 172 throw new IllegalStateException( "No elements to read" ); 173 } 174 175 Tuple<K, V> available = null; 176 177 for ( int i = 0; i < streams.length; i++ ) 178 { 179 if ( heads[i] == null ) 180 { 181 heads[i] = readerWriter.readSortedTuple( streams[i] ); 182 } 183 184 if ( available == null ) 185 { 186 available = heads[i]; 187 candidatePos = i; 188 } 189 else 190 { 191 if ( ( available != null ) && ( heads[i] != null ) ) 192 { 193 int comp = tupleComparator.compare( heads[i], available ); 194 if ( comp <= 0 ) 195 { 196 available = heads[i]; 197 candidatePos = i; 198 } 199 } 200 } 201 } 202 203 heads[candidatePos] = null; 204 205 if ( available == null ) 206 { 207 for ( int i = 0; i < streams.length; i++ ) 208 { 209 if ( heads[i] != null ) 210 { 211 available = heads[i]; 212 heads[i] = readerWriter.readUnsortedTuple( streams[i] ); 213 break; 214 } 215 } 216 } 217 218 if ( available != null ) 219 { 220 candidate = available; 221 return true; 222 } 223 224 // finally close the streams 225 for ( DataInputStream in : streams ) 226 { 227 try 228 { 229 in.close(); 230 } 231 catch ( Exception e ) 232 { 233 e.printStackTrace(); 234 } 235 } 236 237 closed = true; 238 239 return false; 240 } 241 242 243 @Override 244 public Tuple<K, V> next() 245 { 246 if ( candidate == null ) 247 { 248 if ( !closed ) 249 { 250 hasNext(); 251 } 252 } 253 254 if ( candidate == null ) 255 { 256 throw new NoSuchElementException( "No tuples found" ); 257 } 258 259 return candidate; 260 } 261 262 263 @Override 264 public void remove() 265 { 266 throw new UnsupportedOperationException( "Not supported" ); 267 } 268 269 }; 270 271 return itr; 272 } 273}