001/*
002 *   Licensed to the Apache Software Foundation (ASF) under one
003 *   or more contributor license agreements.  See the NOTICE file
004 *   distributed with this work for additional information
005 *   regarding copyright ownership.  The ASF licenses this file
006 *   to you under the Apache License, Version 2.0 (the
007 *   "License"); you may not use this file except in compliance
008 *   with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 *   Unless required by applicable law or agreed to in writing,
013 *   software distributed under the License is distributed on an
014 *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 *   KIND, either express or implied.  See the License for the
016 *   specific language governing permissions and limitations
017 *   under the License.
018 *
019 */
020
021package org.apache.directory.server.ldap.replication.provider;
022
023
024import java.io.IOException;
025import java.util.Iterator;
026
027import org.apache.directory.api.ldap.model.constants.Loggers;
028import org.apache.directory.api.ldap.model.constants.SchemaConstants;
029import org.apache.directory.api.ldap.model.cursor.AbstractCursor;
030import org.apache.directory.api.ldap.model.cursor.Cursor;
031import org.apache.directory.api.ldap.model.cursor.CursorException;
032import org.apache.directory.api.ldap.model.cursor.Tuple;
033import org.apache.directory.api.ldap.model.exception.LdapException;
034import org.apache.directory.api.ldap.model.message.controls.ChangeType;
035import org.apache.directory.server.core.api.partition.PartitionTxn;
036import org.apache.directory.server.core.partition.impl.btree.jdbm.JdbmTable;
037import org.apache.directory.server.ldap.replication.ReplicaEventMessage;
038import org.slf4j.Logger;
039import org.slf4j.LoggerFactory;
040
041
042/**
043 * Define a cursor on top of a replication journal.
044 *
045 * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
046 */
047public class ReplicaJournalCursor extends AbstractCursor<ReplicaEventMessage>
048{
049    /** Logger for this class */
050    private static final Logger LOG = LoggerFactory.getLogger( ReplicaJournalCursor.class );
051
052    /** A dedicated log for cursors */
053    private static final Logger LOG_CURSOR = LoggerFactory.getLogger( Loggers.CURSOR_LOG.getName() );
054
055    /** Speedup for logs */
056    private static final boolean IS_DEBUG = LOG_CURSOR.isDebugEnabled();
057
058    /** the underlying journal's cursor */
059    private Cursor<Tuple<String, ReplicaEventMessage>> tupleCursor;
060
061    /** the event log journal */
062    private JdbmTable<String, ReplicaEventMessage> journal;
063
064    /** the consumer's CSN based on which messages will be qualified for sending */
065    private String consumerCsn;
066
067    private ReplicaEventMessage qualifiedEvtMsg;
068
069    /** used while cleaning up the log */
070    private boolean skipQualifying;
071    
072    /** The partition transaction */
073    private PartitionTxn partitionTxn;
074    
075
076
077    /**
078     * Creates a cursor on top of the given journal
079     * 
080     * @param partitionTxn The Transaction to use
081     * @param journal the log journal
082     * @param consumerCsn the consumer's CSN taken from cookie
083     * @throws Exception If the cursor creation failed
084     */
085    public ReplicaJournalCursor( PartitionTxn partitionTxn, JdbmTable<String, ReplicaEventMessage> journal, 
086            String consumerCsn ) throws Exception
087    {
088        if ( IS_DEBUG )
089        {
090            LOG_CURSOR.debug( "Creating ReplicaJournalCursor {}", this );
091        }
092
093        this.journal = journal;
094        this.tupleCursor = journal.cursor();
095        this.consumerCsn = consumerCsn;
096        this.partitionTxn = partitionTxn;
097    }
098
099
100    /**
101     * {@inheritDoc}
102     */
103    public void after( ReplicaEventMessage arg0 ) throws LdapException, CursorException
104    {
105        throw new UnsupportedOperationException();
106    }
107
108
109    /**
110     * {@inheritDoc}
111     */
112    public void afterLast() throws LdapException, CursorException
113    {
114        throw new UnsupportedOperationException();
115    }
116
117
118    /**
119     * {@inheritDoc}
120     */
121    public boolean available()
122    {
123        return ( qualifiedEvtMsg != null );
124    }
125
126
127    /**
128     * {@inheritDoc}
129     */
130    public void before( ReplicaEventMessage arg0 ) throws LdapException, CursorException
131    {
132        throw new UnsupportedOperationException();
133    }
134
135
136    /**
137     * {@inheritDoc}
138     */
139    public void beforeFirst() throws LdapException, CursorException
140    {
141    }
142
143
144    /**
145     * {@inheritDoc}
146     */
147    public boolean first() throws LdapException, CursorException
148    {
149        throw new UnsupportedOperationException();
150    }
151
152
153    /**
154     * {@inheritDoc}
155     */
156    public ReplicaEventMessage get() throws CursorException
157    {
158        return qualifiedEvtMsg;
159    }
160
161
162    /**
163     * selects the current queue entry if qualified for sending to the consumer
164     * 
165     * @throws Exception
166     */
167    private boolean isQualified( String csn, ReplicaEventMessage evtMsg ) throws LdapException
168    {
169        LOG.debug( "ReplicaEventMessage: {}", evtMsg );
170
171        if ( evtMsg.isEventOlderThan( consumerCsn ) )
172        {
173            if ( LOG.isDebugEnabled() )
174            {
175                String evt = "MODDN"; // take this as default cause the event type for MODDN is null
176
177                ChangeType changeType = evtMsg.getChangeType();
178
179                if ( changeType != null )
180                {
181                    evt = changeType.name();
182                }
183
184                LOG.debug( "event {} for dn {} is not qualified for sending", evt, evtMsg.getEntry().getDn() );
185            }
186
187            return false;
188        }
189
190        return true;
191    }
192
193
194    /**
195     * {@inheritDoc}
196     */
197    public boolean last() throws LdapException, CursorException
198    {
199        throw new UnsupportedOperationException();
200    }
201
202
203    /**
204     * {@inheritDoc}
205     */
206    public boolean next() throws LdapException, CursorException
207    {
208        while ( tupleCursor.next() )
209        {
210            Tuple<String, ReplicaEventMessage> tuple = tupleCursor.get();
211
212            String csn = tuple.getKey();
213            ReplicaEventMessage message = tuple.getValue();
214
215            if ( skipQualifying )
216            {
217                qualifiedEvtMsg = message;
218                return true;
219            }
220
221            boolean qualified = isQualified( csn, message );
222
223            if ( qualified )
224            {
225                qualifiedEvtMsg = message;
226                return true;
227            }
228            else
229            {
230                journal.remove( partitionTxn, csn );
231            }
232        }
233
234        qualifiedEvtMsg = null;
235
236        return false;
237    }
238
239
240    /**
241     * {@inheritDoc}
242     */
243    public boolean previous() throws LdapException, CursorException
244    {
245        throw new UnsupportedOperationException();
246    }
247
248
249    /**
250     * {@inheritDoc}
251     */
252    @Override
253    public void close() throws IOException
254    {
255        if ( IS_DEBUG )
256        {
257            LOG_CURSOR.debug( "Closing ReplicaJournalCursor {}", this );
258        }
259
260        tupleCursor.close();
261        super.close();
262    }
263
264
265    /**
266     * {@inheritDoc}
267     */
268    @Override
269    public void close( Exception cause ) throws IOException
270    {
271        if ( IS_DEBUG )
272        {
273            LOG_CURSOR.debug( "Closing ReplicaJournalCursor {}", this );
274        }
275
276        tupleCursor.close();
277        super.close( cause );
278    }
279
280
281    /**
282     * sets the flag to skip CSN based checking while traversing
283     * used for internal log cleanup ONLY 
284     */
285    protected void skipQualifyingWhileFetching()
286    {
287        skipQualifying = true;
288    }
289
290
291    /**
292     * delete the current message
293     * used for internal log cleanup ONLY
294     */
295    protected void delete()
296    {
297        try
298        {
299            if ( qualifiedEvtMsg != null )
300            {
301                journal.remove( partitionTxn, qualifiedEvtMsg.getEntry().get( SchemaConstants.ENTRY_CSN_AT ).getString() );
302            }
303        }
304        catch ( Exception e )
305        {
306        }
307    }
308
309
310    /**
311     * {@inheritDoc}
312     */
313    @Override
314    public Iterator<ReplicaEventMessage> iterator()
315    {
316        throw new UnsupportedOperationException();
317    }
318}