001/*
002 *  Licensed to the Apache Software Foundation (ASF) under one
003 *  or more contributor license agreements.  See the NOTICE file
004 *  distributed with this work for additional information
005 *  regarding copyright ownership.  The ASF licenses this file
006 *  to you under the Apache License, Version 2.0 (the
007 *  "License"); you may not use this file except in compliance
008 *  with the License.  You may obtain a copy of the License at
009 *
010 *    http://www.apache.org/licenses/LICENSE-2.0
011 *
012 *  Unless required by applicable law or agreed to in writing,
013 *  software distributed under the License is distributed on an
014 *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 *  KIND, either express or implied.  See the License for the
016 *  specific language governing permissions and limitations
017 *  under the License.
018 *
019 */
020package org.apache.directory.server.ldap.replication.consumer;
021
022
023import java.util.ArrayList;
024import java.util.Arrays;
025import java.util.Iterator;
026import java.util.List;
027import java.util.Map;
028
029import org.apache.commons.collections4.map.LRUMap;
030import org.apache.directory.api.ldap.extras.controls.SynchronizationModeEnum;
031import org.apache.directory.api.ldap.extras.controls.syncrepl.syncDone.SyncDoneValue;
032import org.apache.directory.api.ldap.extras.controls.syncrepl.syncRequest.SyncRequestValue;
033import org.apache.directory.api.ldap.extras.controls.syncrepl.syncRequest.SyncRequestValueImpl;
034import org.apache.directory.api.ldap.extras.controls.syncrepl.syncState.SyncStateTypeEnum;
035import org.apache.directory.api.ldap.extras.controls.syncrepl.syncState.SyncStateValue;
036import org.apache.directory.api.ldap.extras.intermediate.syncrepl.SyncInfoValue;
037import org.apache.directory.api.ldap.extras.intermediate.syncrepl.SyncInfoValueImpl;
038import org.apache.directory.api.ldap.model.constants.Loggers;
039import org.apache.directory.api.ldap.model.constants.SchemaConstants;
040import org.apache.directory.api.ldap.model.csn.Csn;
041import org.apache.directory.api.ldap.model.cursor.Cursor;
042import org.apache.directory.api.ldap.model.entry.Attribute;
043import org.apache.directory.api.ldap.model.entry.DefaultAttribute;
044import org.apache.directory.api.ldap.model.entry.DefaultEntry;
045import org.apache.directory.api.ldap.model.entry.DefaultModification;
046import org.apache.directory.api.ldap.model.entry.Entry;
047import org.apache.directory.api.ldap.model.entry.Modification;
048import org.apache.directory.api.ldap.model.entry.ModificationOperation;
049import org.apache.directory.api.ldap.model.exception.LdapException;
050import org.apache.directory.api.ldap.model.exception.LdapNoSuchObjectException;
051import org.apache.directory.api.ldap.model.filter.AndNode;
052import org.apache.directory.api.ldap.model.filter.EqualityNode;
053import org.apache.directory.api.ldap.model.filter.ExprNode;
054import org.apache.directory.api.ldap.model.filter.NotNode;
055import org.apache.directory.api.ldap.model.filter.OrNode;
056import org.apache.directory.api.ldap.model.filter.PresenceNode;
057import org.apache.directory.api.ldap.model.message.AliasDerefMode;
058import org.apache.directory.api.ldap.model.message.IntermediateResponse;
059import org.apache.directory.api.ldap.model.message.Response;
060import org.apache.directory.api.ldap.model.message.ResultCodeEnum;
061import org.apache.directory.api.ldap.model.message.SearchRequest;
062import org.apache.directory.api.ldap.model.message.SearchRequestImpl;
063import org.apache.directory.api.ldap.model.message.SearchResultDone;
064import org.apache.directory.api.ldap.model.message.SearchResultEntry;
065import org.apache.directory.api.ldap.model.message.SearchResultReference;
066import org.apache.directory.api.ldap.model.message.SearchScope;
067import org.apache.directory.api.ldap.model.message.controls.ManageDsaITImpl;
068import org.apache.directory.api.ldap.model.message.controls.SortKey;
069import org.apache.directory.api.ldap.model.message.controls.SortRequest;
070import org.apache.directory.api.ldap.model.message.controls.SortRequestImpl;
071import org.apache.directory.api.ldap.model.name.Dn;
072import org.apache.directory.api.ldap.model.name.Rdn;
073import org.apache.directory.api.ldap.model.schema.AttributeType;
074import org.apache.directory.api.ldap.model.schema.SchemaManager;
075import org.apache.directory.api.util.StringConstants;
076import org.apache.directory.api.util.Strings;
077import org.apache.directory.ldap.client.api.ConnectionClosedEventListener;
078import org.apache.directory.ldap.client.api.LdapNetworkConnection;
079import org.apache.directory.ldap.client.api.future.SearchFuture;
080import org.apache.directory.server.constants.ApacheSchemaConstants;
081import org.apache.directory.server.core.api.CoreSession;
082import org.apache.directory.server.core.api.DirectoryService;
083import org.apache.directory.server.core.api.OperationManager;
084import org.apache.directory.server.core.api.interceptor.context.AddOperationContext;
085import org.apache.directory.server.core.api.interceptor.context.DeleteOperationContext;
086import org.apache.directory.server.core.api.interceptor.context.LookupOperationContext;
087import org.apache.directory.server.core.api.interceptor.context.ModifyOperationContext;
088import org.apache.directory.server.core.api.interceptor.context.MoveAndRenameOperationContext;
089import org.apache.directory.server.core.api.interceptor.context.MoveOperationContext;
090import org.apache.directory.server.core.api.interceptor.context.RenameOperationContext;
091import org.apache.directory.server.core.api.partition.Partition;
092import org.apache.directory.server.core.api.partition.PartitionTxn;
093import org.apache.directory.server.ldap.LdapProtocolUtils;
094import org.apache.directory.server.ldap.replication.ReplicationConsumerConfig;
095import org.apache.directory.server.ldap.replication.SyncReplConfiguration;
096import org.slf4j.Logger;
097import org.slf4j.LoggerFactory;
098import org.slf4j.MDC;
099
100
101/**
102 * Implementation of syncrepl slave a.k.a consumer.
103 *
104 * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
105 */
106public class ReplicationConsumerImpl implements ConnectionClosedEventListener, ReplicationConsumer
107{
108    /** A dedicated logger for the consumer */
109    private static final Logger CONSUMER_LOG = LoggerFactory.getLogger( Loggers.CONSUMER_LOG.getName() );
110
111    /** the syncrepl configuration */
112    private SyncReplConfiguration config;
113
114    /** the sync cookie sent by the server */
115    private byte[] syncCookie;
116
117    /** connection to the syncrepl provider */
118    private LdapNetworkConnection connection;
119
120    /** the search request with control */
121    private SearchRequest searchRequest;
122
123    /** a reference to the directoryService */
124    private DirectoryService directoryService;
125
126    /** the schema manager */
127    private SchemaManager schemaManager;
128
129    /** flag to indicate whether the consumer was disconnected */
130    private volatile boolean disconnected;
131
132    /** the core session */
133    private CoreSession session;
134
135    /** attributes on which modification should be ignored */
136    private static final String[] MOD_IGNORE_AT = new String[]
137        {
138            SchemaConstants.ENTRY_UUID_AT,
139            SchemaConstants.ENTRY_DN_AT,
140            SchemaConstants.CREATE_TIMESTAMP_AT,
141            SchemaConstants.CREATORS_NAME_AT,
142            ApacheSchemaConstants.ENTRY_PARENT_ID_AT,
143            SchemaConstants.COLLECTIVE_ATTRIBUTE_SUBENTRIES_AT,
144            SchemaConstants.CONTEXT_CSN_AT,
145            ApacheSchemaConstants.NB_CHILDREN_AT,
146            ApacheSchemaConstants.NB_SUBORDINATES_AT,
147            SchemaConstants.HAS_SUBORDINATES_AT,
148            SchemaConstants.STRUCTURAL_OBJECT_CLASS_AT,
149    };
150
151    /** the cookie that was saved last time */
152    private byte[] lastSavedCookie;
153
154    private volatile boolean reload = false;
155
156    /** The (entrtyUuid=*) filter */
157    private static final PresenceNode ENTRY_UUID_PRESENCE_FILTER = new PresenceNode( SchemaConstants.ENTRY_UUID_AT );
158
159    private Modification cookieMod;
160
161    private Modification ridMod;
162
163    /** AttributeTypes used for replication */
164    private AttributeType adsReplCookieAT;
165    private AttributeType adsDsReplicaIdAT;
166
167    private static final Map<String, Object> UUID_LOCK_MAP = new LRUMap( 1000 );
168
169
170    /**
171     * @return the config
172     */
173    @Override
174    public SyncReplConfiguration getConfig()
175    {
176        return config;
177    }
178
179
180    /**
181     * Init the replication service
182     * @param directoryservice The directory service
183     */
184    @Override
185    public void init( DirectoryService directoryservice ) throws Exception
186    {
187        this.directoryService = directoryservice;
188
189        session = directoryService.getAdminSession();
190
191        schemaManager = directoryservice.getSchemaManager();
192
193        adsReplCookieAT = schemaManager.lookupAttributeTypeRegistry( SchemaConstants.ADS_REPL_COOKIE );
194        adsDsReplicaIdAT = schemaManager.lookupAttributeTypeRegistry( SchemaConstants.ADS_DS_REPLICA_ID );
195
196        Attribute cookieAttr = new DefaultAttribute( adsReplCookieAT );
197        cookieMod = new DefaultModification( ModificationOperation.REPLACE_ATTRIBUTE, cookieAttr );
198
199        Attribute ridAttr = new DefaultAttribute( adsDsReplicaIdAT );
200        ridMod = new DefaultModification( ModificationOperation.REPLACE_ATTRIBUTE, ridAttr );
201
202        prepareSyncSearchRequest();
203    }
204
205
206    /**
207     * Connect to the remote server. Note that a SyncRepl consumer will be connected to only
208     * one remote server
209     *
210     * @return true if the connections have been successful.
211     */
212    public boolean connect()
213    {
214        String providerHost = config.getRemoteHost();
215        int port = config.getRemotePort();
216
217        try
218        {
219            // Create a connection
220            if ( connection == null )
221            {
222                connection = new LdapNetworkConnection( providerHost, port );
223                connection.setSchemaManager( schemaManager );
224
225                if ( config.isUseTls() )
226                {
227                    connection.getConfig().setTrustManagers( config.getTrustManager() );
228                    connection.getConfig().setUseTls( true );
229                }
230
231                connection.addConnectionClosedEventListener( this );
232            }
233
234            // Try to connect
235            if ( connection.connect() )
236            {
237                CONSUMER_LOG.info( "Consumer {} connected to producer {}", config.getReplicaId(), config.getProducer() );
238
239                // Do a bind
240                try
241                {
242                    connection.bind( config.getReplUserDn(), Strings.utf8ToString( config.getReplUserPassword() ) );
243                    disconnected = false;
244
245                    return true;
246                }
247                catch ( LdapException le )
248                {
249                    CONSUMER_LOG.warn( "Failed to bind to the producer {} with the given bind Dn {}",
250                        config.getProducer(), config.getReplUserDn() );
251                    CONSUMER_LOG.warn( "", le );
252                    disconnected = true;
253                }
254            }
255            else
256            {
257                CONSUMER_LOG.warn( "Consumer {} cannot connect to producer {}", config.getReplicaId(),
258                    config.getProducer() );
259                disconnected = true;
260
261                return false;
262            }
263        }
264        catch ( Exception e )
265        {
266            CONSUMER_LOG.error( "Failed to connect to the producer {}, cause : {}", config.getProducer(),
267                e.getMessage() );
268            disconnected = true;
269        }
270
271        return false;
272    }
273
274
275    /**
276     *  prepares a SearchRequest for syncing DIT content.
277     *
278     */
279    private void prepareSyncSearchRequest() throws LdapException
280    {
281        String baseDn = config.getBaseDn();
282
283        searchRequest = new SearchRequestImpl();
284
285        searchRequest.setBase( new Dn( baseDn ) );
286        searchRequest.setFilter( config.getFilter() );
287        searchRequest.setSizeLimit( config.getSearchSizeLimit() );
288        searchRequest.setTimeLimit( config.getSearchTimeout() );
289
290        searchRequest.setDerefAliases( config.getAliasDerefMode() );
291        searchRequest.setScope( config.getSearchScope() );
292        searchRequest.setTypesOnly( false );
293
294        searchRequest.addAttributes( config.getAttributes() );
295
296        if ( !config.isChaseReferrals() )
297        {
298            searchRequest.addControl( new ManageDsaITImpl() );
299        }
300
301        if ( CONSUMER_LOG.isDebugEnabled() )
302        {
303            MDC.put( "Replica", Integer.toString( config.getReplicaId() ) );
304            CONSUMER_LOG.debug( "Configuring consumer {}", config );
305        }
306    }
307
308
309    private ResultCodeEnum handleSearchResultDone( SearchResultDone searchDone )
310    {
311        CONSUMER_LOG.debug( "///////////////// handleSearchDone //////////////////" );
312
313        SyncDoneValue ctrl = ( SyncDoneValue ) searchDone.getControls().get( SyncDoneValue.OID );
314
315        if ( ( ctrl != null ) && ( ctrl.getCookie() != null ) )
316        {
317            syncCookie = ctrl.getCookie();
318            CONSUMER_LOG.debug( "assigning cookie from sync done value control: {}", Strings.utf8ToString( syncCookie ) );
319            storeCookie();
320        }
321
322        CONSUMER_LOG.debug( "//////////////// END handleSearchDone//////////////////////" );
323
324        reload = false;
325
326        return searchDone.getLdapResult().getResultCode();
327    }
328
329
330    private void handleSearchReference( SearchResultReference searchRef )
331    {
332        // this method won't be called cause the provider will serve the referrals as
333        // normal entry objects due to the usage of ManageDsaITControl in the search request
334    }
335
336
337    /**
338     * Process a SearchResultEntry received from a consumer. We have to handle all the
339     * cases :
340     * - Add
341     * - Modify
342     * - Moddn
343     * - Delete
344     * - Present
345     * @param syncResult
346     */
347    private void handleSearchResultEntry( SearchResultEntry syncResult )
348    {
349        CONSUMER_LOG.debug( "------------- starting handleSearchResult ------------" );
350
351        SyncStateValue syncStateCtrl = ( SyncStateValue ) syncResult.getControl( SyncStateValue.OID );
352
353        try
354        {
355            Entry remoteEntry = new DefaultEntry( schemaManager, syncResult.getEntry() );
356            String uuid = remoteEntry.get( directoryService.getAtProvider().getEntryUUID() ).getString();
357            // lock on UUID to serialize the updates when there are multiple consumers
358            // connected to several producers and to the *same* base/partition
359            Object lock = getLockFor( uuid );
360
361            synchronized ( lock )
362            {
363                int rid = -1;
364
365                if ( syncStateCtrl.getCookie() != null )
366                {
367                    syncCookie = syncStateCtrl.getCookie();
368                    rid = LdapProtocolUtils.getReplicaId( Strings.utf8ToString( syncCookie ) );
369                    CONSUMER_LOG.debug( "assigning the cookie from sync state value control: {}",
370                        Strings.utf8ToString( syncCookie ) );
371                }
372
373                SyncStateTypeEnum state = syncStateCtrl.getSyncStateType();
374
375                // check to avoid conversion of UUID from byte[] to String
376                if ( CONSUMER_LOG.isDebugEnabled() )
377                {
378                    CONSUMER_LOG.debug( "state name {}", state.name() );
379                    CONSUMER_LOG.debug( "entryUUID = {}", Strings.uuidToString( syncStateCtrl.getEntryUUID() ) );
380                }
381
382                Dn remoteDn = remoteEntry.getDn();
383
384                switch ( state )
385                {
386                    case ADD:
387                        boolean remoteDnExist = false;
388
389                        try
390                        {
391                            remoteDnExist = session.exists( remoteDn );
392                        }
393                        catch ( LdapNoSuchObjectException lnsoe )
394                        {
395                            CONSUMER_LOG.error( lnsoe.getMessage() );
396                        }
397
398                        if ( !remoteDnExist )
399                        {
400                            CONSUMER_LOG.debug( "adding entry with dn {}", remoteDn );
401                            CONSUMER_LOG.debug( remoteEntry.toString() );
402                            AddOperationContext addContext = new AddOperationContext( session, remoteEntry );
403                            addContext.setReplEvent( true );
404                            addContext.setRid( rid );
405
406                            OperationManager operationManager = directoryService.getOperationManager();
407                            operationManager.add( addContext );
408                        }
409                        else
410                        {
411                            CONSUMER_LOG.debug( "updating entry in refreshOnly mode {}", remoteDn );
412                            modify( remoteEntry, rid );
413                        }
414
415                        break;
416
417                    case MODIFY:
418                        CONSUMER_LOG.debug( "modifying entry with dn {}", remoteEntry.getDn().getName() );
419                        modify( remoteEntry, rid );
420
421                        break;
422
423                    case MODDN:
424                        String entryUuid = Strings.uuidToString( syncStateCtrl.getEntryUUID() );
425                        applyModDnOperation( remoteEntry, entryUuid, rid );
426
427                        break;
428
429                    case DELETE:
430                        CONSUMER_LOG.debug( "deleting entry with dn {}", remoteEntry.getDn().getName() );
431
432                        if ( !session.exists( remoteDn ) )
433                        {
434                            CONSUMER_LOG
435                                .debug(
436                                    "looks like entry {} was already deleted in a prior update (possibly from another provider), skipping delete",
437                                    remoteDn );
438                        }
439                        else
440                        {
441                            // incase of a MODDN operation resulting in a branch to be moved out of scope
442                            // ApacheDS replication provider sends a single delete event on the Dn of the moved branch
443                            // so the branch needs to be recursively deleted here
444                            deleteRecursive( remoteEntry.getDn(), rid );
445                        }
446
447                        break;
448
449                    case PRESENT:
450                        CONSUMER_LOG.debug( "entry present {}", remoteEntry );
451                        break;
452
453                    default:
454                        throw new IllegalArgumentException( "Unexpected sync state " + state );
455                }
456
457                // store the cookie only if the above operation was successful
458                if ( syncStateCtrl.getCookie() != null )
459                {
460                    storeCookie();
461                }
462            }
463        }
464        catch ( Exception e )
465        {
466            CONSUMER_LOG.error( e.getMessage(), e );
467        }
468
469        CONSUMER_LOG.debug( "------------- Ending handleSearchResult ------------" );
470    }
471
472
473    /**
474     * {@inheritDoc}
475     */
476    private void handleSyncInfo( IntermediateResponse syncInfoResp )
477    {
478        try
479        {
480            CONSUMER_LOG.debug( "............... inside handleSyncInfo ..............." );
481
482            byte[] syncInfoBytes = syncInfoResp.getResponseValue();
483
484            if ( syncInfoBytes == null )
485            {
486                return;
487            }
488
489            SyncInfoValue syncInfoValue = new SyncInfoValueImpl();
490
491            byte[] cookie = syncInfoValue.getCookie();
492
493            if ( CONSUMER_LOG.isDebugEnabled() )
494            {
495                CONSUMER_LOG.debug( "Received a SyncInfoValue from producer {} : {}", config.getProducer(),
496                    syncInfoValue );
497            }
498
499            int replicaId = -1;
500
501            if ( cookie != null )
502            {
503                if ( CONSUMER_LOG.isDebugEnabled() )
504                {
505                    CONSUMER_LOG.debug( "setting the cookie from the sync info: {}", Strings.utf8ToString( cookie ) );
506                    CONSUMER_LOG.debug( "setting the cookie from the sync info: {}", Strings.utf8ToString( cookie ) );
507                }
508
509                syncCookie = cookie;
510
511                String cookieString = Strings.utf8ToString( syncCookie );
512                replicaId = LdapProtocolUtils.getReplicaId( cookieString );
513            }
514
515            CONSUMER_LOG.info( "refreshDeletes: {}", syncInfoValue.isRefreshDeletes() );
516
517            List<byte[]> uuidList = syncInfoValue.getSyncUUIDs();
518
519            // if refreshDeletes set to true then delete all the entries with entryUUID
520            // present in the syncIdSet
521            if ( syncInfoValue.isRefreshDeletes() )
522            {
523                deleteEntries( uuidList, false, replicaId );
524            }
525            else
526            {
527                deleteEntries( uuidList, true, replicaId );
528            }
529
530            CONSUMER_LOG.info( "refreshDone: {}", syncInfoValue.isRefreshDone() );
531
532            storeCookie();
533        }
534        catch ( Exception de )
535        {
536            CONSUMER_LOG.error( "Failed to handle syncinfo message", de );
537        }
538
539        CONSUMER_LOG.debug( ".................... END handleSyncInfo ..............." );
540    }
541
542
543    /**
544     * {@inheritDoc}
545     */
546    @Override
547    public void connectionClosed()
548    {
549        if ( CONSUMER_LOG.isDebugEnabled() )
550        {
551            MDC.put( "Replica", Integer.toString( config.getReplicaId() ) );
552            CONSUMER_LOG.debug( "Consumer {} session with {} has been closed ", config.getReplicaId(),
553                config.getProducer() );
554        }
555
556        disconnect();
557    }
558
559
560    /**
561     * Starts the synchronization operation
562     */
563    @Override
564    public ReplicationStatusEnum startSync()
565    {
566        CONSUMER_LOG.debug( "Starting the SyncRepl process for consumer {}", config.getReplicaId() );
567
568        // read the cookie if persisted
569        readCookie();
570
571        if ( config.isRefreshNPersist() )
572        {
573            try
574            {
575                CONSUMER_LOG.debug( "==================== Refresh And Persist ==========" );
576
577                return doSyncSearch( SynchronizationModeEnum.REFRESH_AND_PERSIST, reload );
578            }
579            catch ( Exception e )
580            {
581                CONSUMER_LOG.error( "Failed to sync with refreshAndPersist mode", e );
582                return ReplicationStatusEnum.DISCONNECTED;
583            }
584        }
585        else
586        {
587            return doRefreshOnly();
588        }
589    }
590
591
592    private ReplicationStatusEnum doRefreshOnly()
593    {
594        while ( !disconnected )
595        {
596            CONSUMER_LOG.debug( "==================== Refresh Only ==========" );
597
598            try
599            {
600                doSyncSearch( SynchronizationModeEnum.REFRESH_ONLY, reload );
601
602                CONSUMER_LOG.debug( "--------------------- Sleep for {} seconds ------------------",
603                    ( config.getRefreshInterval() / 1000 ) );
604                Thread.sleep( config.getRefreshInterval() );
605                CONSUMER_LOG.debug( "--------------------- syncing again ------------------" );
606
607            }
608            catch ( InterruptedException ie )
609            {
610                CONSUMER_LOG.warn( "refresher thread interrupted" );
611
612                return ReplicationStatusEnum.DISCONNECTED;
613            }
614            catch ( Exception e )
615            {
616                CONSUMER_LOG.error( "Failed to sync with refresh only mode", e );
617                return ReplicationStatusEnum.DISCONNECTED;
618            }
619        }
620
621        return ReplicationStatusEnum.STOPPED;
622    }
623
624
625    /**
626     * {@inheritDoc}
627     */
628    @Override
629    public void setConfig( ReplicationConsumerConfig config )
630    {
631        this.config = ( SyncReplConfiguration ) config;
632    }
633
634
635    /**
636     * {@inheritDoc}
637     */
638    @Override
639    public boolean connect( boolean now )
640    {
641        boolean connected = false;
642
643        if ( now )
644        {
645            connected = connect();
646        }
647
648        while ( !connected )
649        {
650            try
651            {
652                CONSUMER_LOG.debug( "Consumer {} cannot connect to {}, wait 5 seconds.", config.getReplicaId(),
653                    config.getProducer() );
654
655                // try to establish a connection for every 5 seconds
656                Thread.sleep( 5000 );
657            }
658            catch ( InterruptedException e )
659            {
660                CONSUMER_LOG.warn( "Consumer {} Interrupted while trying to reconnect to the provider {}",
661                    config.getReplicaId(), config.getProducer() );
662            }
663
664            connected = connect();
665        }
666
667        // TODO : we may have cases were we get here with the connected flag to false. With the above
668        // code, thi sis not possible
669
670        return connected;
671    }
672
673
674    /**
675     * {@inheritDoc}
676     */
677    @Override
678    public void ping()
679    {
680        boolean connected = !disconnected;
681
682        boolean restartSync = false;
683
684        if ( disconnected )
685        {
686            connected = connect();
687            restartSync = connected;
688        }
689
690        if ( connected )
691        {
692            CONSUMER_LOG.debug( "PING : The consumer {} is alive", config.getReplicaId() );
693
694            // DIRSERVER-2014
695            if ( restartSync )
696            {
697                CONSUMER_LOG.warn( "Restarting the disconnected consumer {}", config.getReplicaId() );
698                disconnected = false;
699                startSync();
700            }
701        }
702        else
703        {
704            CONSUMER_LOG.debug( "PING : The consumer {} cannot be connected", config.getReplicaId() );
705        }
706    }
707
708
709    /**
710     * {@inheritDoc}
711     */
712    @Override
713    public void stop()
714    {
715        if ( !disconnected )
716        {
717            disconnect();
718        }
719    }
720
721
722    /**
723     * {@inheritDoc}
724     */
725    @Override
726    public String getId()
727    {
728        return String.valueOf( getConfig().getReplicaId() );
729    }
730
731
732    /**
733     * Performs a search on connection with updated syncRequest control. The provider
734     * will initiate an UpdateContant or an initContent depending on the current consumer
735     * status, accordingly to the cookie's content.
736     * If the mode is refreshOnly, the server will send a SearchResultDone when all the modified
737     * entries have been sent.
738     * If the mode is refreshAndPersist, the provider never send a SearchResultDone, so we keep
739     * receiving modifications' notifications on the consumer, and never exit the loop, unless
740     * some communication error occurs.
741     *
742     * @param syncType The synchornization type, either REFRESH_ONLY or REFRESH_AND_PERSIST
743     * @param reloadHint A flag used to tell the server that we want a reload
744     * @return The replication status
745     * @throws Exception in case of any problems encountered while searching
746     */
747    private ReplicationStatusEnum doSyncSearch( SynchronizationModeEnum syncType, boolean reloadHint ) throws Exception
748    {
749        CONSUMER_LOG.debug( "Starting synchronization mode {}, reloadHint {}", syncType, reloadHint );
750        // Prepare the Syncrepl Request
751        SyncRequestValue syncReq = new SyncRequestValueImpl();
752
753        syncReq.setMode( syncType );
754        syncReq.setReloadHint( reloadHint );
755
756        // If we have a persisted cookie, send it.
757        if ( syncCookie != null )
758        {
759            CONSUMER_LOG.debug( "searching on {} with searchRequest, cookie '{}'", config.getProducer(),
760                Strings.utf8ToString( syncCookie ) );
761            syncReq.setCookie( syncCookie );
762        }
763        else
764        {
765            CONSUMER_LOG.debug( "searching on {} with searchRequest, no cookie", config.getProducer() );
766        }
767
768        searchRequest.addControl( syncReq );
769
770        // Do the search. We use a searchAsync because we want to get SearchResultDone responses
771        SearchFuture sf = connection.searchAsync( searchRequest );
772
773        Response resp = sf.get();
774
775        CONSUMER_LOG.debug( "Response from {} : {}", config.getProducer(), resp );
776
777        // Now, process the responses. We loop until we have a connection termination or
778        // a SearchResultDone (RefreshOnly mode)
779        while ( !( resp instanceof SearchResultDone ) && !sf.isCancelled() && !disconnected )
780        {
781            if ( resp instanceof SearchResultEntry )
782            {
783                SearchResultEntry result = ( SearchResultEntry ) resp;
784
785                handleSearchResultEntry( result );
786            }
787            else if ( resp instanceof SearchResultReference )
788            {
789                handleSearchReference( ( SearchResultReference ) resp );
790            }
791            else if ( resp instanceof IntermediateResponse )
792            {
793                handleSyncInfo( ( IntermediateResponse ) resp );
794            }
795
796            // Next entry
797            resp = sf.get();
798            CONSUMER_LOG.debug( "Response from {} : {}", config.getProducer(), resp );
799        }
800
801        if ( sf.isCancelled() )
802        {
803
804            CONSUMER_LOG.debug( "Search sync on {} has been canceled ", config.getProducer(), sf.getCause() );
805
806            return ReplicationStatusEnum.DISCONNECTED;
807        }
808        else if ( disconnected )
809        {
810            CONSUMER_LOG.debug( "Disconnected from {}", config.getProducer() );
811
812            return ReplicationStatusEnum.DISCONNECTED;
813        }
814        else
815        {
816            ResultCodeEnum resultCode = handleSearchResultDone( ( SearchResultDone ) resp );
817
818            CONSUMER_LOG.debug( "Rsultcode of Sync operation from {} : {}", config.getProducer(), resultCode );
819
820            if ( resultCode == ResultCodeEnum.NO_SUCH_OBJECT )
821            {
822                // log the error and handle it appropriately
823                CONSUMER_LOG.warn( "The base Dn {} is not found on provider {}", config.getBaseDn(),
824                    config.getProducer() );
825
826                CONSUMER_LOG.warn( "Disconnecting the Refresh&Persist consumer from provider {}", config.getProducer() );
827                disconnect();
828
829                return ReplicationStatusEnum.DISCONNECTED;
830            }
831            else if ( resultCode == ResultCodeEnum.E_SYNC_REFRESH_REQUIRED )
832            {
833                CONSUMER_LOG.warn( "Full SYNC_REFRESH required from {}", config.getProducer() );
834
835                reload = true;
836
837                try
838                {
839                    CONSUMER_LOG.debug( "Deleting baseDN {}", config.getBaseDn() );
840
841                    // FIXME taking a backup right before deleting might be a good thing, just to be safe.
842                    // the backup file can be deleted after reload completes successfully
843
844                    // the 'rid' value is not taken into consideration when 'reload' is set
845                    // so any dummy value is fine
846                    deleteRecursive( new Dn( config.getBaseDn() ), -1000 );
847                }
848                catch ( Exception e )
849                {
850                    CONSUMER_LOG
851                        .error(
852                            "Failed to delete the replica base as part of handling E_SYNC_REFRESH_REQUIRED, disconnecting the consumer",
853                            e );
854                }
855
856                // Do a full update.
857                removeCookie();
858
859                CONSUMER_LOG.debug( "Re-doing a syncRefresh from producer {}", config.getProducer() );
860
861                return ReplicationStatusEnum.REFRESH_REQUIRED;
862            }
863            else
864            {
865                CONSUMER_LOG.debug( "Got result code {} from producer {}. Replication stopped", resultCode,
866                    config.getProducer() );
867                return ReplicationStatusEnum.DISCONNECTED;
868            }
869        }
870    }
871
872
873    /**
874     * Disconnect from the producer
875     */
876    private void disconnect()
877    {
878        disconnected = true;
879
880        try
881        {
882            if ( ( connection != null ) && connection.isConnected() )
883            {
884                connection.unBind();
885                CONSUMER_LOG.info( "Unbound from the server {}", config.getProducer() );
886
887                if ( CONSUMER_LOG.isDebugEnabled() )
888                {
889                    MDC.put( "Replica", Integer.toString( config.getReplicaId() ) );
890                    CONSUMER_LOG.info( "Unbound from the server {}", config.getProducer() );
891                }
892
893                connection.close();
894                CONSUMER_LOG.info( "Connection closed for the server {}", config.getProducer() );
895
896                connection = null;
897            }
898        }
899        catch ( Exception e )
900        {
901            CONSUMER_LOG.error( "Failed to close the connection", e );
902        }
903        finally
904        {
905            // persist the cookie
906            storeCookie();
907
908            // reset the cookie
909            syncCookie = null;
910        }
911    }
912
913
914    /**
915     * stores the cookie.
916     */
917    private void storeCookie()
918    {
919        CONSUMER_LOG.debug( "Storing the cookie '{}'", Strings.utf8ToString( syncCookie ) );
920
921        if ( syncCookie == null )
922        {
923            return;
924        }
925
926        if ( ( lastSavedCookie != null ) && Arrays.equals( syncCookie, lastSavedCookie ) )
927        {
928            return;
929        }
930
931        try
932        {
933            Attribute attr = cookieMod.getAttribute();
934            attr.clear();
935            attr.add( syncCookie );
936
937            String cookieString = Strings.utf8ToString( syncCookie );
938            int replicaId = LdapProtocolUtils.getReplicaId( cookieString );
939
940            Attribute ridAt = ridMod.getAttribute();
941            ridAt.clear();
942            ridAt.add( String.valueOf( replicaId ) );
943
944            CONSUMER_LOG.debug( "Storing the cookie in the DIT : {}", config.getConfigEntryDn() );
945
946            session.modify( config.getConfigEntryDn(), cookieMod );
947            CONSUMER_LOG.debug( "stored the cookie in entry {}", config.getConfigEntryDn() );
948
949            lastSavedCookie = new byte[syncCookie.length];
950            System.arraycopy( syncCookie, 0, lastSavedCookie, 0, syncCookie.length );
951        }
952        catch ( Exception e )
953        {
954            CONSUMER_LOG.error( "Failed to store the cookie in consumer entry {}", config.getConfigEntryDn(), e );
955        }
956    }
957
958
959    /**
960     * Read the cookie for a consumer
961     */
962    private void readCookie()
963    {
964        try
965        {
966            Entry entry = session.lookup( config.getConfigEntryDn(), SchemaConstants.ADS_REPL_COOKIE );
967
968            if ( entry != null )
969            {
970                Attribute attr = entry.get( adsReplCookieAT );
971
972                if ( attr != null )
973                {
974                    syncCookie = attr.getBytes();
975                    lastSavedCookie = syncCookie;
976                    String syncCookieString = Strings.utf8ToString( syncCookie );
977                    CONSUMER_LOG.debug( "Loaded cookie {} for consumer {}", syncCookieString, config.getReplicaId() );
978                }
979                else
980                {
981                    CONSUMER_LOG.debug( "No cookie found for consumer {}", config.getReplicaId() );
982                }
983            }
984            else
985            {
986                CONSUMER_LOG.debug( "Cannot find the configuration '{}' in the DIT for consumer {}",
987                    config.getConfigEntryDn(), config.getReplicaId() );
988            }
989        }
990        catch ( Exception e )
991        {
992            // can be ignored, most likely happens if there is no entry with the given Dn
993            // log in debug mode
994            CONSUMER_LOG.debug( "Failed to read the cookie, cannot find the entry '{}' in the DIT for consumer {}",
995                config.getConfigEntryDn(),
996                config.getReplicaId() );
997        }
998    }
999
1000
1001    /**
1002     * deletes the cookie and resets the syncCookie to null
1003     */
1004    private void removeCookie()
1005    {
1006        try
1007        {
1008            Attribute cookieAttr = new DefaultAttribute( adsReplCookieAT );
1009            Modification deleteCookieMod = new DefaultModification( ModificationOperation.REMOVE_ATTRIBUTE,
1010                cookieAttr );
1011            session.modify( config.getConfigEntryDn(), deleteCookieMod );
1012            CONSUMER_LOG.info( "resetting sync cookie of the consumer with config entry Dn {}",
1013                config.getConfigEntryDn() );
1014        }
1015        catch ( Exception e )
1016        {
1017            CONSUMER_LOG.warn( "Failed to delete the cookie from the consumer with config entry Dn {}",
1018                config.getConfigEntryDn() );
1019            CONSUMER_LOG.warn( "{}", e );
1020        }
1021
1022        syncCookie = null;
1023        lastSavedCookie = null;
1024    }
1025
1026
1027    private void applyModDnOperation( Entry remoteEntry, String entryUuid, int rid ) throws Exception
1028    {
1029        CONSUMER_LOG.debug( "MODDN for entry {}, new entry : {}", entryUuid, remoteEntry );
1030
1031        // Retrieve locally the moved or renamed entry
1032        String filter = "(entryUuid=" + entryUuid + ")";
1033        SearchRequest searchRequest = new SearchRequestImpl();
1034        searchRequest.setBase( new Dn( schemaManager, config.getBaseDn() ) );
1035        searchRequest.setFilter( filter );
1036        searchRequest.setScope( SearchScope.SUBTREE );
1037        searchRequest.addAttributes( SchemaConstants.ENTRY_UUID_AT, SchemaConstants.ENTRY_CSN_AT,
1038            SchemaConstants.ALL_USER_ATTRIBUTES );
1039
1040        Cursor<Entry> cursor = session.search( searchRequest );
1041        cursor.beforeFirst();
1042
1043        Entry localEntry = null;
1044
1045        if ( cursor.next() )
1046        {
1047            localEntry = cursor.get();
1048        }
1049
1050        cursor.close();
1051
1052        // can happen in MMR scenario
1053        if ( localEntry == null )
1054        {
1055            return;
1056        }
1057
1058        if ( config.isMmrMode() )
1059        {
1060            Csn localCsn = new Csn( localEntry.get( SchemaConstants.ENTRY_CSN_AT ).getString() );
1061            Csn remoteCsn = new Csn( remoteEntry.get( SchemaConstants.ENTRY_CSN_AT ).getString() );
1062
1063            if ( localCsn.compareTo( remoteCsn ) >= 0 )
1064            {
1065                // just discard the received modified entry, that is old
1066                CONSUMER_LOG.debug( "local modification is latest, discarding the modDn operation dn {}",
1067                    remoteEntry.getDn() );
1068                return;
1069            }
1070        }
1071
1072        // Compute the DN, parentDn and Rdn for both entries
1073        Dn localDn = localEntry.getDn();
1074        Dn remoteDn = directoryService.getDnFactory().create( remoteEntry.getDn().getName() );
1075
1076        Dn localParentDn = localDn.getParent();
1077        Dn remoteParentDn = directoryService.getDnFactory().create( remoteDn.getParent().getName() );
1078
1079        Rdn localRdn = localDn.getRdn();
1080        Rdn remoteRdn = directoryService.getDnFactory().create( remoteDn.getRdn().getName() ).getRdn();
1081
1082        // Check if the OldRdn has been deleted
1083        boolean deleteOldRdn = !remoteEntry.contains( localRdn.getNormType(), localRdn.getValue() );
1084
1085        if ( localRdn.equals( remoteRdn ) )
1086        {
1087            // If the RDN are equals, it's a MOVE
1088            CONSUMER_LOG.debug( "moving {} to the new parent {}", localDn, remoteParentDn );
1089            MoveOperationContext movCtx = new MoveOperationContext( session, localDn, remoteParentDn );
1090            movCtx.setReplEvent( true );
1091            movCtx.setRid( rid );
1092            directoryService.getOperationManager().move( movCtx );
1093        }
1094        else if ( localParentDn.equals( remoteParentDn ) )
1095        {
1096            // If the parentDn are equals, it's a RENAME
1097            CONSUMER_LOG.debug( "renaming the Dn {} with new Rdn {} and deleteOldRdn flag set to {}",
1098                localDn.getName(), remoteRdn.getName(), String.valueOf( deleteOldRdn ) );
1099
1100            RenameOperationContext renCtx = new RenameOperationContext( session, localDn, remoteRdn,
1101                deleteOldRdn );
1102            renCtx.setReplEvent( true );
1103            renCtx.setRid( rid );
1104            directoryService.getOperationManager().rename( renCtx );
1105        }
1106        else
1107        {
1108            // Otherwise, it's a MOVE and RENAME
1109            CONSUMER_LOG.debug(
1110                "moveAndRename on the Dn {} with new newParent Dn {}, new Rdn {} and deleteOldRdn flag set to {}",
1111                localDn.getName(),
1112                remoteParentDn.getName(),
1113                remoteRdn.getName(),
1114                String.valueOf( deleteOldRdn ) );
1115
1116            MoveAndRenameOperationContext movRenCtx = new MoveAndRenameOperationContext( session, localDn,
1117                remoteParentDn, remoteRdn, deleteOldRdn );
1118            movRenCtx.setReplEvent( true );
1119            movRenCtx.setRid( rid );
1120            directoryService.getOperationManager().moveAndRename( movRenCtx );
1121        }
1122    }
1123
1124
1125    private void modify( Entry remoteEntry, int rid ) throws Exception
1126    {
1127        String[] attributes = computeAttributes( config.getAttributes(), SchemaConstants.ALL_OPERATIONAL_ATTRIBUTES );
1128
1129        LookupOperationContext lookupCtx =
1130            new LookupOperationContext( session, remoteEntry.getDn(), attributes );
1131
1132        lookupCtx.setSyncreplLookup( true );
1133
1134        Entry localEntry;
1135
1136        Partition partition = session.getDirectoryService().getPartitionNexus().getPartition( remoteEntry.getDn() );
1137
1138        try ( PartitionTxn partitionTxn = partition.beginReadTransaction() )
1139        {
1140            lookupCtx.setTransaction( partitionTxn );
1141            localEntry = session.getDirectoryService().getOperationManager().lookup( lookupCtx );
1142        }
1143
1144        if ( config.isMmrMode() )
1145        {
1146            Csn localCsn = new Csn( localEntry.get( SchemaConstants.ENTRY_CSN_AT ).getString() );
1147            Csn remoteCsn = new Csn( remoteEntry.get( SchemaConstants.ENTRY_CSN_AT ).getString() );
1148
1149            if ( localCsn.compareTo( remoteCsn ) >= 0 )
1150            {
1151                // just discard the received modified entry, that is old
1152                CONSUMER_LOG.debug( "local modification is latest, discarding the modification of dn {}",
1153                    remoteEntry.getDn() );
1154                return;
1155            }
1156        }
1157
1158        remoteEntry.removeAttributes( MOD_IGNORE_AT );
1159        localEntry.removeAttributes( MOD_IGNORE_AT );
1160
1161        List<Modification> mods = new ArrayList<>();
1162        Iterator<Attribute> itr = localEntry.iterator();
1163
1164        while ( itr.hasNext() )
1165        {
1166            Attribute localAttr = itr.next();
1167            String attrId = localAttr.getId();
1168            Modification mod;
1169            Attribute remoteAttr = remoteEntry.get( attrId );
1170
1171            if ( remoteAttr != null ) // would be better if we compare the values also? or will it consume more time?
1172            {
1173                mod = new DefaultModification( ModificationOperation.REPLACE_ATTRIBUTE, remoteAttr );
1174                remoteEntry.remove( remoteAttr );
1175            }
1176            else
1177            {
1178                mod = new DefaultModification( ModificationOperation.REMOVE_ATTRIBUTE, localAttr );
1179            }
1180
1181            mods.add( mod );
1182        }
1183
1184        if ( remoteEntry.size() > 0 )
1185        {
1186            itr = remoteEntry.iterator();
1187
1188            while ( itr.hasNext() )
1189            {
1190                mods.add( new DefaultModification( ModificationOperation.ADD_ATTRIBUTE, itr.next() ) );
1191            }
1192        }
1193
1194        List<Modification> serverModifications = new ArrayList<>( mods.size() );
1195
1196        for ( Modification mod : mods )
1197        {
1198            serverModifications.add( new DefaultModification( directoryService.getSchemaManager(), mod ) );
1199        }
1200
1201        ModifyOperationContext modifyContext = new ModifyOperationContext( session, remoteEntry.getDn(),
1202            serverModifications );
1203        modifyContext.setReplEvent( true );
1204        modifyContext.setRid( rid );
1205
1206        OperationManager operationManager = directoryService.getOperationManager();
1207        operationManager.modify( modifyContext );
1208    }
1209
1210
1211    /**
1212     * Create a new list combining a list and a newly added attribute
1213     */
1214    private String[] computeAttributes( String[] attributes, String addedAttribute )
1215    {
1216        if ( attributes != null )
1217        {
1218            if ( addedAttribute != null )
1219            {
1220                String[] combinedAttributes = new String[attributes.length + 1];
1221
1222                System.arraycopy( attributes, 0, combinedAttributes, 0, attributes.length );
1223                combinedAttributes[attributes.length] = addedAttribute;
1224
1225                return combinedAttributes;
1226            }
1227            else
1228            {
1229                return attributes;
1230            }
1231        }
1232        else
1233        {
1234            if ( addedAttribute != null )
1235            {
1236                return new String[]
1237                    { addedAttribute };
1238            }
1239            else
1240            {
1241                return StringConstants.EMPTY_STRINGS;
1242            }
1243        }
1244    }
1245
1246
1247    /**
1248     * deletes the entries having the UUID given in the list
1249     *
1250     * @param uuidList the list of UUIDs
1251     * @param replicaId TODO
1252     * @throws Exception in case of any problems while deleting the entries
1253     */
1254    private void deleteEntries( List<byte[]> uuidList, boolean isRefreshPresent, int replicaId ) throws Exception
1255    {
1256        if ( uuidList == null || uuidList.isEmpty() )
1257        {
1258            return;
1259        }
1260
1261        // if it is refreshPresent list then send all the UUIDs for
1262        // filtering, otherwise breaking the list will cause the
1263        // other present entries to be deleted from DIT
1264        if ( isRefreshPresent )
1265        {
1266            CONSUMER_LOG.debug( "refresh present syncinfo list has {} UUIDs", uuidList.size() );
1267            processDelete( uuidList, isRefreshPresent, replicaId );
1268            return;
1269        }
1270
1271        int nodeLimit = 10;
1272
1273        int count = uuidList.size() / nodeLimit;
1274
1275        int startIndex = 0;
1276        int i = 0;
1277        for ( ; i < count; i++ )
1278        {
1279            startIndex = i * nodeLimit;
1280            processDelete( uuidList.subList( startIndex, startIndex + nodeLimit ), isRefreshPresent, replicaId );
1281        }
1282
1283        if ( ( uuidList.size() % nodeLimit ) != 0 )
1284        {
1285            // remove the remaining entries
1286            if ( count > 0 )
1287            {
1288                startIndex = i * nodeLimit;
1289            }
1290
1291            processDelete( uuidList.subList( startIndex, uuidList.size() ), isRefreshPresent, replicaId );
1292        }
1293    }
1294
1295
1296    /**
1297     * do not call this method directly, instead call deleteEntries()
1298     *
1299     * @param limitedUuidList a list of UUIDs whose size is less than or equal to #NODE_LIMIT (node limit applies only for refreshDeletes list)
1300     * @param isRefreshPresent a flag indicating the type of entries present in the UUID list
1301     * @param replicaId TODO
1302     */
1303    private void processDelete( List<byte[]> limitedUuidList, boolean isRefreshPresent, int replicaId )
1304        throws Exception
1305    {
1306        ExprNode filter = null;
1307        int size = limitedUuidList.size();
1308        if ( size == 1 )
1309        {
1310            String uuid = Strings.uuidToString( limitedUuidList.get( 0 ) );
1311
1312            filter = new EqualityNode<String>( SchemaConstants.ENTRY_UUID_AT, uuid );
1313            if ( isRefreshPresent )
1314            {
1315                filter = new NotNode( filter );
1316            }
1317        }
1318        else
1319        {
1320            if ( isRefreshPresent )
1321            {
1322                filter = new AndNode();
1323            }
1324            else
1325            {
1326                filter = new OrNode();
1327            }
1328
1329            for ( int i = 0; i < size; i++ )
1330            {
1331                String uuid = Strings.uuidToString( limitedUuidList.get( i ) );
1332                ExprNode uuidEqNode = new EqualityNode<String>( SchemaConstants.ENTRY_UUID_AT, uuid );
1333
1334                if ( isRefreshPresent )
1335                {
1336                    uuidEqNode = new NotNode( uuidEqNode );
1337                    ( ( AndNode ) filter ).addNode( uuidEqNode );
1338                }
1339                else
1340                {
1341                    ( ( OrNode ) filter ).addNode( uuidEqNode );
1342                }
1343            }
1344        }
1345
1346        Dn dn = new Dn( schemaManager, config.getBaseDn() );
1347
1348        if ( CONSUMER_LOG.isDebugEnabled() )
1349        {
1350            CONSUMER_LOG.debug( "selecting entries to be deleted using filter {}", filter );
1351        }
1352
1353        SearchRequest req = new SearchRequestImpl();
1354        req.setBase( dn );
1355        req.setFilter( filter );
1356        req.setScope( SearchScope.SUBTREE );
1357        req.setDerefAliases( AliasDerefMode.NEVER_DEREF_ALIASES );
1358        // the ENTRY_DN_AT must be in the attribute list, otherwise sorting fails
1359        req.addAttributes( SchemaConstants.ENTRY_DN_AT );
1360
1361        SortKey sk = new SortKey( SchemaConstants.ENTRY_DN_AT, SchemaConstants.DISTINGUISHED_NAME_MATCH_MR_OID );
1362        SortRequest ctrl = new SortRequestImpl();
1363        ctrl.addSortKey( sk );
1364        req.addControl( ctrl );
1365
1366        OperationManager operationManager = directoryService.getOperationManager();
1367
1368        Cursor<Entry> cursor = session.search( req );
1369        cursor.beforeFirst();
1370
1371        while ( cursor.next() )
1372        {
1373            Entry entry = cursor.get();
1374
1375            DeleteOperationContext ctx = new DeleteOperationContext( session );
1376            ctx.setReplEvent( true );
1377            ctx.setRid( replicaId );
1378
1379            // DO NOT generate replication event if this is being deleted as part of
1380            // e_sync_refresh_required
1381            if ( reload )
1382            {
1383                ctx.setGenerateNoReplEvt( true );
1384            }
1385
1386            ctx.setDn( entry.getDn() );
1387            operationManager.delete( ctx );
1388        }
1389
1390        cursor.close();
1391    }
1392
1393
1394    private synchronized Object getLockFor( String uuid )
1395    {
1396        Object lock = UUID_LOCK_MAP.get( uuid );
1397
1398        if ( lock == null )
1399        {
1400            lock = new Object();
1401            UUID_LOCK_MAP.put( uuid, lock );
1402        }
1403
1404        return lock;
1405    }
1406
1407
1408    /**
1409     * removes all child entries present under the given Dn and finally the Dn itself
1410     *
1411     * @param rootDn the Dn which will be removed after removing its children
1412     * @param rid the replica ID
1413     * @throws Exception If the Dn is not valid or if the deletion failed
1414     */
1415    private void deleteRecursive( Dn rootDn, int rid ) throws Exception
1416    {
1417        CONSUMER_LOG.debug( "searching for Dn {} and its children before deleting", rootDn.getName() );
1418        Cursor<Entry> cursor = null;
1419
1420        try
1421        {
1422            SearchRequest req = new SearchRequestImpl();
1423            req.setBase( rootDn );
1424            req.setFilter( ENTRY_UUID_PRESENCE_FILTER );
1425            req.setScope( SearchScope.SUBTREE );
1426            req.setDerefAliases( AliasDerefMode.NEVER_DEREF_ALIASES );
1427            // the ENTRY_DN_AT must be in the attribute list, otherwise sorting fails
1428            req.addAttributes( SchemaConstants.ENTRY_DN_AT );
1429
1430            SortKey sk = new SortKey( SchemaConstants.ENTRY_DN_AT, SchemaConstants.DISTINGUISHED_NAME_MATCH_MR_OID );
1431
1432            SortRequest ctrl = new SortRequestImpl();
1433            ctrl.addSortKey( sk );
1434            req.addControl( ctrl );
1435
1436            cursor = session.search( req );
1437            cursor.beforeFirst();
1438
1439            OperationManager operationManager = directoryService.getOperationManager();
1440
1441            while ( cursor.next() )
1442            {
1443                Entry e = cursor.get();
1444
1445                DeleteOperationContext ctx = new DeleteOperationContext( session );
1446                ctx.setReplEvent( true );
1447                ctx.setRid( rid );
1448
1449                // DO NOT generate replication event if this is being deleted as part of
1450                // e_sync_refresh_required
1451                if ( reload )
1452                {
1453                    ctx.setGenerateNoReplEvt( true );
1454                }
1455
1456                ctx.setDn( e.getDn() );
1457
1458                operationManager.delete( ctx );
1459            }
1460        }
1461        catch ( Exception e )
1462        {
1463            String msg = "Failed to delete the Dn " + rootDn.getName() + " and its children (if any present)";
1464            CONSUMER_LOG.error( msg, e );
1465            throw e;
1466        }
1467        finally
1468        {
1469            if ( cursor != null )
1470            {
1471                cursor.close();
1472            }
1473        }
1474    }
1475
1476
1477    /**
1478     * @see Object#toString()
1479     */
1480    @Override
1481    public String toString()
1482    {
1483        StringBuilder sb = new StringBuilder();
1484
1485        sb.append( "Consumer " ).append( config );
1486
1487        return sb.toString();
1488    }
1489}