001/*
002 *   Licensed to the Apache Software Foundation (ASF) under one
003 *   or more contributor license agreements.  See the NOTICE file
004 *   distributed with this work for additional information
005 *   regarding copyright ownership.  The ASF licenses this file
006 *   to you under the Apache License, Version 2.0 (the
007 *   "License"); you may not use this file except in compliance
008 *   with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 *   Unless required by applicable law or agreed to in writing,
013 *   software distributed under the License is distributed on an
014 *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 *   KIND, either express or implied.  See the License for the
016 *   specific language governing permissions and limitations
017 *   under the License.
018 *
019 */
020package org.apache.directory.server.ldap.replication.consumer;
021
022
023import java.util.Queue;
024import java.util.concurrent.ConcurrentLinkedQueue;
025
026import org.apache.directory.api.ldap.model.constants.Loggers;
027import org.slf4j.Logger;
028import org.slf4j.LoggerFactory;
029import org.slf4j.MDC;
030
031
032/**
033 * A thread used to ping the provider o check if they are alive or not.
034 *
035 * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
036 */
037public class PingerThread extends Thread
038{
039    /** Logger for the replication consumer */
040    private static final Logger CONSUMER_LOG = LoggerFactory.getLogger( Loggers.CONSUMER_LOG.getName() );
041
042    /** The list of consumers we want to check */
043    private Queue<ReplicationConsumer> consumers = new ConcurrentLinkedQueue<>();
044
045    /** A flag to stop the pinger */
046    private boolean stop = false;
047
048    /** the time interval before this thread pings each replication provider. Default value is 5 seconds */
049    private long sleepTime = 5000;
050
051    /**
052     * Create a new instance of this thread.
053     * 
054     * @param sleepSec the number of seconds pinger thread should sleep before pinging the providers
055     */
056    public PingerThread( int sleepSec )
057    {
058        if ( sleepSec > 0 )
059        {
060            sleepTime = sleepSec * 1000L;
061        }
062        
063        CONSUMER_LOG.info( "Configured pinger thread to sleep for {} seconds", ( sleepTime / 1000 ) );
064        
065        setDaemon( true );
066    }
067
068
069    /**
070     * Starts the thread
071     */
072    @Override
073    public void run()
074    {
075        try
076        {
077            if ( CONSUMER_LOG.isDebugEnabled() )
078            {
079                MDC.put( "Replica", "Pinger" );
080
081                CONSUMER_LOG.debug( "Starting the provider's pinger" );
082            }
083
084            while ( !stop )
085            {
086                for ( ReplicationConsumer consumer : consumers )
087                {
088                    consumer.ping();
089                }
090
091                Thread.sleep( sleepTime );
092            }
093        }
094        catch ( InterruptedException ie )
095        {
096            CONSUMER_LOG.debug( "The pinger has been interrupted" );
097        }
098    }
099
100
101    /**
102     * Add a new consumer to ping
103     * 
104     * @param consumer The consumer we want to ping
105     */
106    public void addConsumer( ReplicationConsumer consumer )
107    {
108        if ( !consumers.contains( consumer ) )
109        {
110            consumers.add( consumer );
111        }
112    }
113
114
115    /**
116     * Remove a consumer to ping
117     * @param consumer The consumer we want to remove
118     */
119    public void removeConsumer( ReplicationConsumer consumer )
120    {
121        consumers.remove( consumer );
122    }
123
124
125    /**
126     * Stops the ping for all the consumers
127     */
128    public void stopPinging()
129    {
130        stop = true;
131    }
132}