001/*
002 *   Licensed to the Apache Software Foundation (ASF) under one
003 *   or more contributor license agreements.  See the NOTICE file
004 *   distributed with this work for additional information
005 *   regarding copyright ownership.  The ASF licenses this file
006 *   to you under the Apache License, Version 2.0 (the
007 *   "License"); you may not use this file except in compliance
008 *   with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 *   Unless required by applicable law or agreed to in writing,
013 *   software distributed under the License is distributed on an
014 *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 *   KIND, either express or implied.  See the License for the
016 *   specific language governing permissions and limitations
017 *   under the License.
018 *
019 */
020
021package org.apache.directory.server.ldap.replication.provider;
022
023
024import java.io.File;
025import java.io.IOException;
026
027import jdbm.RecordManager;
028import jdbm.recman.BaseRecordManager;
029import jdbm.recman.TransactionManager;
030
031import org.apache.directory.api.ldap.model.constants.Loggers;
032import org.apache.directory.api.ldap.model.constants.SchemaConstants;
033import org.apache.directory.api.ldap.model.exception.LdapException;
034import org.apache.directory.api.ldap.model.name.Dn;
035import org.apache.directory.api.ldap.model.schema.SchemaManager;
036import org.apache.directory.api.ldap.model.schema.comparators.SerializableComparator;
037import org.apache.directory.server.core.api.DirectoryService;
038import org.apache.directory.server.core.api.event.EventType;
039import org.apache.directory.server.core.api.event.NotificationCriteria;
040import org.apache.directory.server.core.api.partition.PartitionTxn;
041import org.apache.directory.server.core.partition.impl.btree.jdbm.JdbmTable;
042import org.apache.directory.server.core.partition.impl.btree.jdbm.StringSerializer;
043import org.apache.directory.server.ldap.replication.ReplicaEventMessage;
044import org.apache.directory.server.ldap.replication.ReplicaEventMessageSerializer;
045import org.slf4j.Logger;
046import org.slf4j.LoggerFactory;
047
048
049/**
050 * A structure storing the configuration on each consumer registered on a producer. It stores 
051 * the following informations :
052 * <ul>
053 * <li>replicaId : the internal ID associated with the consumer on the provider</li>
054 * <li>hostname : the consumer's host</li>
055 * <li>searchFilter : the filter</li>
056 * <li>lastSentCsn : the last CSN sent by the consumer</li>
057 * <li>refreshNPersist : a flag indicating that the consumer is processing in Refresh and persist mode</li>
058 * <li></li>
059 * </ul>
060 * A separate log is maintained for each syncrepl consumer.<br>
061 * We also associate a Queue with each structure, which will store the messages to send to the consumer.
062 *
063 * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
064 */
065public class ReplicaEventLog implements Comparable<ReplicaEventLog>
066{
067    /** The logger */
068    private static final Logger LOG = LoggerFactory.getLogger( ReplicaEventLog.class );
069
070    /** A logger for the replication provider */
071    private static final Logger PROVIDER_LOG = LoggerFactory.getLogger( Loggers.PROVIDER_LOG.getName() );
072
073    /** hostname of the syncrepl consumer */
074    private String hostName;
075
076    /** the unmodified search filter as it was when received from the client */
077    private String searchFilter;
078
079    /** the csn that was sent to the client during the last sync session*/
080    private String lastSentCsn;
081
082    /** the persistent listener */
083    private SyncReplSearchListener persistentListener;
084
085    /** notification criteria used by the persistent search */
086    private NotificationCriteria searchCriteria;
087
088    /** the replica id */
089    private int replicaId;
090
091    /** flag indicating refreshAndPersist mode */
092    private boolean refreshNPersist;
093
094    /** the duration(in seconds) of consumer inactivity after which this log will be deleted. Defaults to 172800 seconds (i.e. 2 days) */
095    private long maxIdlePeriod = DEFAULT_MAX_IDLE_PERIOD;
096
097    /** the minimum number of entries to be present for beginning purging entries older than the last sent CSN. Default is 10000 */
098    private int purgeThresholdCount = DEFAULT_PURGE_THRESHOLD_COUNT;
099
100    // fields that won't be serialized
101    /** The Journal of modifications */
102    private JdbmTable<String, ReplicaEventMessage> journal;
103
104    /** the underlying file  */
105    private File journalFile;
106
107    /** The record manager*/
108    private RecordManager recman;
109
110    /** A flag used to indicate that the consumer is not up to date */
111    private volatile boolean dirty;
112
113    /** the DN of the entry where this event log details are stored */
114    private Dn consumerEntryDn;
115
116    public static final String REPLICA_EVENT_LOG_NAME_PREFIX = "REPL_EVENT_LOG.";
117
118    public static final int DEFAULT_PURGE_THRESHOLD_COUNT = 10000;
119
120    /** The max delay for an idle replication log with no activity, by default the logs have no idle time period */
121    public static final int DEFAULT_MAX_IDLE_PERIOD = -1;
122    
123    /** The partition transaction */
124    private PartitionTxn partitionTxn;
125
126
127    /**
128     * Creates a new instance of EventLog for a replica
129     * 
130     * @param partitionTxn The Transaction to use
131     * @param directoryService The DirectoryService instance
132     * @param replicaId The replica ID
133     * @throws IOException if we weren't able to log the event
134     */
135    public ReplicaEventLog( PartitionTxn partitionTxn, DirectoryService directoryService, int replicaId ) throws IOException
136    {
137        PROVIDER_LOG.debug( "Creating the replication queue for replica {}", replicaId );
138        SchemaManager schemaManager = directoryService.getSchemaManager();
139        this.replicaId = replicaId;
140        this.searchCriteria = new NotificationCriteria( schemaManager );
141        this.searchCriteria.setEventMask( EventType.ALL_EVENT_TYPES_MASK );
142
143        // Create the journal file, or open if already exists
144        File replDir = directoryService.getInstanceLayout().getReplDirectory();
145        journalFile = new File( replDir, REPLICA_EVENT_LOG_NAME_PREFIX + replicaId );
146        recman = new BaseRecordManager( journalFile.getAbsolutePath() );
147        TransactionManager transactionManager = ( ( BaseRecordManager ) recman ).getTransactionManager();
148        transactionManager.setMaximumTransactionsInLog( 200 );
149
150        SerializableComparator<String> comparator = new SerializableComparator<>(
151            SchemaConstants.CSN_ORDERING_MATCH_MR_OID );
152        comparator.setSchemaManager( schemaManager );
153
154        journal = new JdbmTable<>( schemaManager, journalFile.getName(), recman, comparator,
155            StringSerializer.INSTANCE, new ReplicaEventMessageSerializer( schemaManager ) );
156        
157        this.partitionTxn = partitionTxn;
158    }
159
160
161    /**
162     * Stores the given message in the queue 
163     *
164     * @param message The message to store
165     */
166    public synchronized void log( ReplicaEventMessage message )
167    {
168        try
169        {
170            LOG.debug( "logging entry with Dn {} with the event {}", message.getEntry().getDn(),
171                message.getChangeType() );
172            PROVIDER_LOG.debug( "logging entry with Dn {} with the event {}", message.getEntry().getDn(),
173                message.getChangeType() );
174
175            String entryCsn = message.getEntry().get( SchemaConstants.ENTRY_CSN_AT ).getString();
176            journal.put( partitionTxn, entryCsn, message );
177        }
178        catch ( Exception e )
179        {
180            LOG.warn( "Failed to insert the entry into syncrepl log", e );
181            PROVIDER_LOG.error( "Failed to insert the entry into syncrepl log", e );
182        }
183    }
184
185
186    /**
187     * Deletes the queue (to remove the log) and recreates a new queue instance
188     * with the same queue name. Also creates the corresponding message producer
189     *
190     * @throws Exception If the queue can't be deleted
191     */
192    public void truncate() throws Exception
193    {
194    }
195
196
197    /**
198     * Re-create the queue
199     * @throws Exception If the creation has failed
200     */
201    public void recreate() throws Exception
202    {
203        LOG.debug( "recreating the queue for the replica id {}", replicaId );
204    }
205
206
207    /**
208     * Stop the EventLog
209     * 
210     * @throws Exception If the stop failed
211     */
212    public void stop() throws Exception
213    {
214        PROVIDER_LOG.debug( "Stopping the EventLog for replicaId {}", replicaId );
215
216        // Close the producer and session, DO NOT close connection 
217        if ( journal != null )
218        {
219            journal.close( partitionTxn );
220        }
221
222        journal = null;
223
224        if ( recman != null )
225        {
226            recman.close();
227        }
228
229        recman = null;
230    }
231
232
233    /**
234     * {@inheritDoc}
235     */
236    @Override
237    public boolean equals( Object obj )
238    {
239        if ( !( obj instanceof ReplicaEventLog ) )
240        {
241            return false;
242        }
243
244        ReplicaEventLog other = ( ReplicaEventLog ) obj;
245
246        return replicaId == other.getId();
247    }
248
249
250    /**
251     * {@inheritDoc}
252     */
253    @Override
254    public int hashCode()
255    {
256        int result = 17;
257        result = 31 * result + searchFilter.hashCode();
258        result = 31 * result + hostName.hashCode();
259
260        return result;
261    }
262
263
264    /**
265     * {@inheritDoc}
266     */
267    public int compareTo( ReplicaEventLog o )
268    {
269        if ( this.equals( o ) )
270        {
271            return 0;
272        }
273
274        return 1;
275    }
276
277
278    /**
279     * @return The listener
280     */
281    public SyncReplSearchListener getPersistentListener()
282    {
283        return persistentListener;
284    }
285
286
287    /**
288     * Set the listener
289     * @param persistentListener The listener
290     */
291    public void setPersistentListener( SyncReplSearchListener persistentListener )
292    {
293        this.persistentListener = persistentListener;
294    }
295
296
297    /**
298     * @return The search criteria
299     */
300    public NotificationCriteria getSearchCriteria()
301    {
302        return searchCriteria;
303    }
304
305
306    /**
307     * Stores the search criteria
308     * @param searchCriteria The search criteria
309     */
310    public void setSearchCriteria( NotificationCriteria searchCriteria )
311    {
312        this.searchCriteria = searchCriteria;
313    }
314
315
316    /**
317     * @return true if the consumer is in Refresh And Persist mode
318     */
319    public boolean isRefreshNPersist()
320    {
321        return refreshNPersist;
322    }
323
324
325    /**
326     * @param refreshNPersist if true, set the EventLog in Refresh and Persist mode
327     */
328    public void setRefreshNPersist( boolean refreshNPersist )
329    {
330        this.refreshNPersist = refreshNPersist;
331    }
332
333
334    /**
335     * @return The replica ID
336     */
337    public int getId()
338    {
339        return replicaId;
340    }
341
342
343    /**
344     * @return The last CSN sent by the consumer
345     */
346    public String getLastSentCsn()
347    {
348        return lastSentCsn;
349    }
350
351
352    /**
353     * Update the last Sent CSN. If it's different from the present one, we
354     * will set the dirty flag to true, and it will be stored in DIT.
355     *  
356     * @param lastSentCsn The new Sent CSN
357     */
358    public void setLastSentCsn( String lastSentCsn )
359    {
360        // set only if there is a change in cookie value
361        // this will avoid setting the dirty flag which eventually is used for
362        // storing the details of this log
363        if ( !lastSentCsn.equals( this.lastSentCsn ) )
364        {
365            this.lastSentCsn = lastSentCsn;
366            dirty = true;
367        }
368    }
369
370
371    /**
372     * @return The consumer Hostname
373     */
374    public String getHostName()
375    {
376        return hostName;
377    }
378
379
380    /**
381     * Set the consumer hostname
382     * @param hostName The consumer hostname
383     */
384    public void setHostName( String hostName )
385    {
386        this.hostName = hostName;
387    }
388
389
390    /**
391     * @return The searchFilter
392     */
393    public String getSearchFilter()
394    {
395        return searchFilter;
396    }
397
398
399    /**
400     * Set the searchFilter
401     * @param searchFilter The searchFilter
402     */
403    public void setSearchFilter( String searchFilter )
404    {
405        this.searchFilter = searchFilter;
406    }
407
408
409    /**
410     * @return True if the consumer is not up to date
411     */
412    public boolean isDirty()
413    {
414        return dirty;
415    }
416
417
418    /**
419     * Set the dirty flag
420     * @param dirty The current consumer status
421     */
422    public void setDirty( boolean dirty )
423    {
424        this.dirty = dirty;
425    }
426
427
428    /**
429     * @return The queue name
430     */
431    public String getQueueName()
432    {
433        return "replicaId=" + replicaId;
434    }
435
436
437    /**
438     * @param consumerCsn the consumer's CSN extracted from cookie
439     * @return A cursor on top of the queue
440     * @throws Exception If the cursor can't be created
441     */
442    public ReplicaJournalCursor getCursor( String consumerCsn ) throws Exception
443    {
444        return new ReplicaJournalCursor( partitionTxn, journal, consumerCsn );
445    }
446
447
448    /**
449     * @return the name of this replica log
450     */
451    public String getName()
452    {
453        return journal.getName();
454    }
455
456
457    /**
458     * @return the number of entries present in the replica log
459     */
460    public synchronized long count()
461    {
462        try
463        {
464            return journal.count( partitionTxn );
465        }
466        catch ( LdapException e )
467        {
468            throw new RuntimeException( e );
469        }
470    }
471
472
473    public long getMaxIdlePeriod()
474    {
475        return maxIdlePeriod;
476    }
477
478
479    public void setMaxIdlePeriod( long maxIdlePeriod )
480    {
481        if ( maxIdlePeriod <= 0 )
482        {
483            maxIdlePeriod = DEFAULT_MAX_IDLE_PERIOD;
484        }
485
486        this.maxIdlePeriod = maxIdlePeriod;
487    }
488
489
490    public int getPurgeThresholdCount()
491    {
492        return purgeThresholdCount;
493    }
494
495
496    public void setPurgeThresholdCount( int purgeThresholdCount )
497    {
498        if ( purgeThresholdCount <= 0 )
499        {
500            purgeThresholdCount = DEFAULT_PURGE_THRESHOLD_COUNT;
501        }
502
503        this.purgeThresholdCount = purgeThresholdCount;
504    }
505
506
507    public Dn getConsumerEntryDn()
508    {
509        return consumerEntryDn;
510    }
511
512
513    public void setConsumerEntryDn( Dn consumerEntryDn )
514    {
515        this.consumerEntryDn = consumerEntryDn;
516    }
517
518
519    @Override
520    public String toString()
521    {
522        return "ReplicaEventLog [hostName=" + hostName + ", searchFilter=" + searchFilter + ", lastSentCsn="
523            + lastSentCsn + ", searchCriteria=" + searchCriteria + ", replicaId=" + replicaId
524            + ", refreshNPersist=" + refreshNPersist + ", maxInactivePeriod=" + maxIdlePeriod
525            + ", purgeThresholdCount=" + purgeThresholdCount + ", journalFile=" + journalFile
526            + ", dirty=" + dirty + ", consumerEntryDn=" + consumerEntryDn + "]";
527    }
528}