001/*
002 *   Licensed to the Apache Software Foundation (ASF) under one
003 *   or more contributor license agreements.  See the NOTICE file
004 *   distributed with this work for additional information
005 *   regarding copyright ownership.  The ASF licenses this file
006 *   to you under the Apache License, Version 2.0 (the
007 *   "License"); you may not use this file except in compliance
008 *   with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 *   Unless required by applicable law or agreed to in writing,
013 *   software distributed under the License is distributed on an
014 *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 *   KIND, either express or implied.  See the License for the
016 *   specific language governing permissions and limitations
017 *   under the License.
018 *
019 */
020package org.apache.directory.server.ldap.replication.provider;
021
022
023import java.io.File;
024import java.util.Map;
025
026import org.apache.directory.api.ldap.model.constants.SchemaConstants;
027import org.apache.directory.api.ldap.model.csn.Csn;
028import org.apache.directory.api.ldap.model.exception.LdapException;
029import org.apache.directory.server.core.api.DirectoryService;
030import org.apache.directory.server.ldap.replication.ReplicaEventMessage;
031import org.slf4j.Logger;
032import org.slf4j.LoggerFactory;
033
034
035/**
036 * Deletes old entries from the replication event logs that are configured in refreshNPersist mode.
037 * 
038 * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
039 */
040public class ReplicaEventLogJanitor extends Thread
041{
042    private static final Logger LOG = LoggerFactory.getLogger( ReplicaEventLogJanitor.class );
043
044    private DirectoryService directoryService;
045
046    private Map<Integer, ReplicaEventLog> replicaLogMap;
047
048    private volatile boolean stop = false;
049
050    /** A lock used to wait */
051    final Object lock = new Object();
052
053    /** time the janitor thread sleeps before successive cleanup attempts. Default value is 5 minutes */
054    private long sleepTime = 5 * 60 * 1000L;
055
056    private long thresholdTime = 2 * 60 * 60 * 1000L;
057
058
059    public ReplicaEventLogJanitor( final DirectoryService directoryService,
060        final Map<Integer, ReplicaEventLog> replicaLogMap )
061    {
062        // if log is in refreshNpersist mode, has more entries than the log's threshold count then 
063        // all the entries before the last sent CSN and older than 2 hours will be purged
064        this.directoryService = directoryService;
065        this.replicaLogMap = replicaLogMap;
066        setDaemon( true );
067    }
068
069
070    @Override
071    public void run()
072    {
073        while ( !stop )
074        {
075            for ( ReplicaEventLog log : replicaLogMap.values() )
076            {
077                synchronized ( log ) // lock the log and clean
078                {
079                    try
080                    {
081                        String lastSentCsn = log.getLastSentCsn();
082
083                        if ( lastSentCsn == null )
084                        {
085                            LOG.debug( "last sent CSN is null for the replica {}, skipping cleanup", log.getName() );
086                            return;
087                        }
088
089                        long now = directoryService.getTimeProvider().currentIimeMillis();
090
091                        long maxIdleTime = log.getMaxIdlePeriod() * 1000L;
092
093                        long lastUpdatedTime = new Csn( lastSentCsn ).getTimestamp();
094
095                        LOG.debug( "checking log idle time now={} lastUpdatedTime={} maxIdleTime={}", now,
096                            lastUpdatedTime, maxIdleTime );
097
098                        // DO NOT delete those with maxIdleTime <= 0
099                        if ( ( maxIdleTime > 0 ) && ( now - lastUpdatedTime ) >= maxIdleTime )
100                        {
101                            //max idle time of the event log reached, delete it
102                            removeEventLog( log );
103
104                            // delete the associated entry from DiT, note that ConsumerLogEntryDeleteListener 
105                            // will get called eventually but removeEventLog() will not be called cause by 
106                            // that time this log will not be present in replicaLogMap
107                            // The reason we don't call this method first is to guard against any rename
108                            // operation performed on the log's entry in DiT
109                            try
110                            {
111                                directoryService.getAdminSession().delete( log.getConsumerEntryDn() );
112                            }
113                            catch ( LdapException e )
114                            {
115                                LOG.warn( "Failed to delete the entry {} of replica event log {}",
116                                    log.getConsumerEntryDn(), log.getName(), e );
117                            }
118
119                            continue;
120                        }
121
122                        long thresholdCount = log.getPurgeThresholdCount();
123
124                        if ( log.count() < thresholdCount )
125                        {
126                            continue;
127                        }
128
129                        LOG.debug( "starting to purge the log entries that are older than {} milliseconds",
130                            thresholdTime );
131
132                        long deleteCount = 0;
133
134                        ReplicaJournalCursor cursor = log.getCursor( null ); // pass no CSN
135                        cursor.skipQualifyingWhileFetching();
136
137                        while ( cursor.next() )
138                        {
139                            ReplicaEventMessage message = cursor.get();
140                            String csnVal = message.getEntry().get( SchemaConstants.ENTRY_CSN_AT ).getString();
141
142                            // skip if we reach the lastSentCsn or got past it
143                            if ( csnVal.compareTo( lastSentCsn ) >= 0 )
144                            {
145                                break;
146                            }
147
148                            Csn csn = new Csn( csnVal );
149
150                            if ( ( now - csn.getTimestamp() ) >= thresholdTime )
151                            {
152                                cursor.delete();
153                                deleteCount++;
154                            }
155                        }
156
157                        cursor.close();
158
159                        LOG.debug( "purged {} messages from the log {}", deleteCount, log.getName() );
160                    }
161                    catch ( Exception e )
162                    {
163                        LOG.warn( "Failed to purge old entries from the log {}", log.getName(), e );
164                    }
165                }
166            }
167
168            try
169            {
170                synchronized ( lock )
171                {
172                    lock.wait( sleepTime );
173                }
174            }
175            catch ( InterruptedException e )
176            {
177                LOG.warn( "ReplicaEventLogJanitor thread was interrupted, processing logs for cleanup", e );
178            }
179        }
180    }
181
182
183    public synchronized void removeEventLog( ReplicaEventLog replicaEventLog )
184    {
185        directoryService.getEventService().removeListener( replicaEventLog.getPersistentListener() );
186        String name = replicaEventLog.getName();
187        LOG.debug( "removed the persistent listener for replication event log {}", name );
188
189        replicaLogMap.remove( replicaEventLog.getId() );
190
191        try
192        {
193            replicaEventLog.stop();
194
195            new File( directoryService.getInstanceLayout().getReplDirectory(), name + ".db" ).delete();
196            new File( directoryService.getInstanceLayout().getReplDirectory(), name + ".lg" ).delete();
197            LOG.info( "successfully removed replication event log {}", name );
198        }
199        catch ( Exception e )
200        {
201            LOG.warn(
202                "Closing the replication event log of the entry {} was not successful, will be removed anyway",
203                name, e );
204        }
205    }
206
207
208    public void setSleepTime( long sleepTime )
209    {
210        this.sleepTime = sleepTime;
211    }
212
213
214    public long getSleepTime()
215    {
216        return sleepTime;
217    }
218
219
220    public void stopCleaning()
221    {
222        stop = true;
223
224        synchronized ( lock )
225        {
226            lock.notify();
227        }
228    }
229}