001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 * 019 */ 020 021package org.apache.directory.server.ldap.replication.provider; 022 023 024import java.io.IOException; 025import java.util.Iterator; 026 027import org.apache.directory.api.ldap.model.constants.Loggers; 028import org.apache.directory.api.ldap.model.constants.SchemaConstants; 029import org.apache.directory.api.ldap.model.cursor.AbstractCursor; 030import org.apache.directory.api.ldap.model.cursor.Cursor; 031import org.apache.directory.api.ldap.model.cursor.CursorException; 032import org.apache.directory.api.ldap.model.cursor.Tuple; 033import org.apache.directory.api.ldap.model.exception.LdapException; 034import org.apache.directory.api.ldap.model.message.controls.ChangeType; 035import org.apache.directory.server.core.api.partition.PartitionTxn; 036import org.apache.directory.server.core.partition.impl.btree.jdbm.JdbmTable; 037import org.apache.directory.server.ldap.replication.ReplicaEventMessage; 038import org.slf4j.Logger; 039import org.slf4j.LoggerFactory; 040 041 042/** 043 * Define a cursor on top of a replication journal. 044 * 045 * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a> 046 */ 047public class ReplicaJournalCursor extends AbstractCursor<ReplicaEventMessage> 048{ 049 /** Logger for this class */ 050 private static final Logger LOG = LoggerFactory.getLogger( ReplicaJournalCursor.class ); 051 052 /** A dedicated log for cursors */ 053 private static final Logger LOG_CURSOR = LoggerFactory.getLogger( Loggers.CURSOR_LOG.getName() ); 054 055 /** Speedup for logs */ 056 private static final boolean IS_DEBUG = LOG_CURSOR.isDebugEnabled(); 057 058 /** the underlying journal's cursor */ 059 private Cursor<Tuple<String, ReplicaEventMessage>> tupleCursor; 060 061 /** the event log journal */ 062 private JdbmTable<String, ReplicaEventMessage> journal; 063 064 /** the consumer's CSN based on which messages will be qualified for sending */ 065 private String consumerCsn; 066 067 private ReplicaEventMessage qualifiedEvtMsg; 068 069 /** used while cleaning up the log */ 070 private boolean skipQualifying; 071 072 /** The partition transaction */ 073 private PartitionTxn partitionTxn; 074 075 076 077 /** 078 * Creates a cursor on top of the given journal 079 * 080 * @param partitionTxn The Transaction to use 081 * @param journal the log journal 082 * @param consumerCsn the consumer's CSN taken from cookie 083 * @throws Exception If the cursor creation failed 084 */ 085 public ReplicaJournalCursor( PartitionTxn partitionTxn, JdbmTable<String, ReplicaEventMessage> journal, 086 String consumerCsn ) throws Exception 087 { 088 if ( IS_DEBUG ) 089 { 090 LOG_CURSOR.debug( "Creating ReplicaJournalCursor {}", this ); 091 } 092 093 this.journal = journal; 094 this.tupleCursor = journal.cursor(); 095 this.consumerCsn = consumerCsn; 096 this.partitionTxn = partitionTxn; 097 } 098 099 100 /** 101 * {@inheritDoc} 102 */ 103 public void after( ReplicaEventMessage arg0 ) throws LdapException, CursorException 104 { 105 throw new UnsupportedOperationException(); 106 } 107 108 109 /** 110 * {@inheritDoc} 111 */ 112 public void afterLast() throws LdapException, CursorException 113 { 114 throw new UnsupportedOperationException(); 115 } 116 117 118 /** 119 * {@inheritDoc} 120 */ 121 public boolean available() 122 { 123 return ( qualifiedEvtMsg != null ); 124 } 125 126 127 /** 128 * {@inheritDoc} 129 */ 130 public void before( ReplicaEventMessage arg0 ) throws LdapException, CursorException 131 { 132 throw new UnsupportedOperationException(); 133 } 134 135 136 /** 137 * {@inheritDoc} 138 */ 139 public void beforeFirst() throws LdapException, CursorException 140 { 141 } 142 143 144 /** 145 * {@inheritDoc} 146 */ 147 public boolean first() throws LdapException, CursorException 148 { 149 throw new UnsupportedOperationException(); 150 } 151 152 153 /** 154 * {@inheritDoc} 155 */ 156 public ReplicaEventMessage get() throws CursorException 157 { 158 return qualifiedEvtMsg; 159 } 160 161 162 /** 163 * selects the current queue entry if qualified for sending to the consumer 164 * 165 * @throws Exception 166 */ 167 private boolean isQualified( String csn, ReplicaEventMessage evtMsg ) throws LdapException 168 { 169 LOG.debug( "ReplicaEventMessage: {}", evtMsg ); 170 171 if ( evtMsg.isEventOlderThan( consumerCsn ) ) 172 { 173 if ( LOG.isDebugEnabled() ) 174 { 175 String evt = "MODDN"; // take this as default cause the event type for MODDN is null 176 177 ChangeType changeType = evtMsg.getChangeType(); 178 179 if ( changeType != null ) 180 { 181 evt = changeType.name(); 182 } 183 184 LOG.debug( "event {} for dn {} is not qualified for sending", evt, evtMsg.getEntry().getDn() ); 185 } 186 187 return false; 188 } 189 190 return true; 191 } 192 193 194 /** 195 * {@inheritDoc} 196 */ 197 public boolean last() throws LdapException, CursorException 198 { 199 throw new UnsupportedOperationException(); 200 } 201 202 203 /** 204 * {@inheritDoc} 205 */ 206 public boolean next() throws LdapException, CursorException 207 { 208 while ( tupleCursor.next() ) 209 { 210 Tuple<String, ReplicaEventMessage> tuple = tupleCursor.get(); 211 212 String csn = tuple.getKey(); 213 ReplicaEventMessage message = tuple.getValue(); 214 215 if ( skipQualifying ) 216 { 217 qualifiedEvtMsg = message; 218 return true; 219 } 220 221 boolean qualified = isQualified( csn, message ); 222 223 if ( qualified ) 224 { 225 qualifiedEvtMsg = message; 226 return true; 227 } 228 else 229 { 230 journal.remove( partitionTxn, csn ); 231 } 232 } 233 234 qualifiedEvtMsg = null; 235 236 return false; 237 } 238 239 240 /** 241 * {@inheritDoc} 242 */ 243 public boolean previous() throws LdapException, CursorException 244 { 245 throw new UnsupportedOperationException(); 246 } 247 248 249 /** 250 * {@inheritDoc} 251 */ 252 @Override 253 public void close() throws IOException 254 { 255 if ( IS_DEBUG ) 256 { 257 LOG_CURSOR.debug( "Closing ReplicaJournalCursor {}", this ); 258 } 259 260 tupleCursor.close(); 261 super.close(); 262 } 263 264 265 /** 266 * {@inheritDoc} 267 */ 268 @Override 269 public void close( Exception cause ) throws IOException 270 { 271 if ( IS_DEBUG ) 272 { 273 LOG_CURSOR.debug( "Closing ReplicaJournalCursor {}", this ); 274 } 275 276 tupleCursor.close(); 277 super.close( cause ); 278 } 279 280 281 /** 282 * sets the flag to skip CSN based checking while traversing 283 * used for internal log cleanup ONLY 284 */ 285 protected void skipQualifyingWhileFetching() 286 { 287 skipQualifying = true; 288 } 289 290 291 /** 292 * delete the current message 293 * used for internal log cleanup ONLY 294 */ 295 protected void delete() 296 { 297 try 298 { 299 if ( qualifiedEvtMsg != null ) 300 { 301 journal.remove( partitionTxn, qualifiedEvtMsg.getEntry().get( SchemaConstants.ENTRY_CSN_AT ).getString() ); 302 } 303 } 304 catch ( Exception e ) 305 { 306 } 307 } 308 309 310 /** 311 * {@inheritDoc} 312 */ 313 @Override 314 public Iterator<ReplicaEventMessage> iterator() 315 { 316 throw new UnsupportedOperationException(); 317 } 318}