001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 * 019 */ 020package org.apache.directory.server.ldap.replication.consumer; 021 022 023import java.util.ArrayList; 024import java.util.Arrays; 025import java.util.Iterator; 026import java.util.List; 027import java.util.Map; 028 029import org.apache.commons.collections4.map.LRUMap; 030import org.apache.directory.api.ldap.extras.controls.SynchronizationModeEnum; 031import org.apache.directory.api.ldap.extras.controls.syncrepl.syncDone.SyncDoneValue; 032import org.apache.directory.api.ldap.extras.controls.syncrepl.syncRequest.SyncRequestValue; 033import org.apache.directory.api.ldap.extras.controls.syncrepl.syncRequest.SyncRequestValueImpl; 034import org.apache.directory.api.ldap.extras.controls.syncrepl.syncState.SyncStateTypeEnum; 035import org.apache.directory.api.ldap.extras.controls.syncrepl.syncState.SyncStateValue; 036import org.apache.directory.api.ldap.extras.intermediate.syncrepl.SyncInfoValue; 037import org.apache.directory.api.ldap.extras.intermediate.syncrepl.SyncInfoValueImpl; 038import org.apache.directory.api.ldap.model.constants.Loggers; 039import org.apache.directory.api.ldap.model.constants.SchemaConstants; 040import org.apache.directory.api.ldap.model.csn.Csn; 041import org.apache.directory.api.ldap.model.cursor.Cursor; 042import org.apache.directory.api.ldap.model.entry.Attribute; 043import org.apache.directory.api.ldap.model.entry.DefaultAttribute; 044import org.apache.directory.api.ldap.model.entry.DefaultEntry; 045import org.apache.directory.api.ldap.model.entry.DefaultModification; 046import org.apache.directory.api.ldap.model.entry.Entry; 047import org.apache.directory.api.ldap.model.entry.Modification; 048import org.apache.directory.api.ldap.model.entry.ModificationOperation; 049import org.apache.directory.api.ldap.model.exception.LdapException; 050import org.apache.directory.api.ldap.model.exception.LdapNoSuchObjectException; 051import org.apache.directory.api.ldap.model.filter.AndNode; 052import org.apache.directory.api.ldap.model.filter.EqualityNode; 053import org.apache.directory.api.ldap.model.filter.ExprNode; 054import org.apache.directory.api.ldap.model.filter.NotNode; 055import org.apache.directory.api.ldap.model.filter.OrNode; 056import org.apache.directory.api.ldap.model.filter.PresenceNode; 057import org.apache.directory.api.ldap.model.message.AliasDerefMode; 058import org.apache.directory.api.ldap.model.message.IntermediateResponse; 059import org.apache.directory.api.ldap.model.message.Response; 060import org.apache.directory.api.ldap.model.message.ResultCodeEnum; 061import org.apache.directory.api.ldap.model.message.SearchRequest; 062import org.apache.directory.api.ldap.model.message.SearchRequestImpl; 063import org.apache.directory.api.ldap.model.message.SearchResultDone; 064import org.apache.directory.api.ldap.model.message.SearchResultEntry; 065import org.apache.directory.api.ldap.model.message.SearchResultReference; 066import org.apache.directory.api.ldap.model.message.SearchScope; 067import org.apache.directory.api.ldap.model.message.controls.ManageDsaITImpl; 068import org.apache.directory.api.ldap.model.message.controls.SortKey; 069import org.apache.directory.api.ldap.model.message.controls.SortRequest; 070import org.apache.directory.api.ldap.model.message.controls.SortRequestImpl; 071import org.apache.directory.api.ldap.model.name.Dn; 072import org.apache.directory.api.ldap.model.name.Rdn; 073import org.apache.directory.api.ldap.model.schema.AttributeType; 074import org.apache.directory.api.ldap.model.schema.SchemaManager; 075import org.apache.directory.api.util.StringConstants; 076import org.apache.directory.api.util.Strings; 077import org.apache.directory.ldap.client.api.ConnectionClosedEventListener; 078import org.apache.directory.ldap.client.api.LdapNetworkConnection; 079import org.apache.directory.ldap.client.api.future.SearchFuture; 080import org.apache.directory.server.constants.ApacheSchemaConstants; 081import org.apache.directory.server.core.api.CoreSession; 082import org.apache.directory.server.core.api.DirectoryService; 083import org.apache.directory.server.core.api.OperationManager; 084import org.apache.directory.server.core.api.interceptor.context.AddOperationContext; 085import org.apache.directory.server.core.api.interceptor.context.DeleteOperationContext; 086import org.apache.directory.server.core.api.interceptor.context.LookupOperationContext; 087import org.apache.directory.server.core.api.interceptor.context.ModifyOperationContext; 088import org.apache.directory.server.core.api.interceptor.context.MoveAndRenameOperationContext; 089import org.apache.directory.server.core.api.interceptor.context.MoveOperationContext; 090import org.apache.directory.server.core.api.interceptor.context.RenameOperationContext; 091import org.apache.directory.server.core.api.partition.Partition; 092import org.apache.directory.server.core.api.partition.PartitionTxn; 093import org.apache.directory.server.ldap.LdapProtocolUtils; 094import org.apache.directory.server.ldap.replication.ReplicationConsumerConfig; 095import org.apache.directory.server.ldap.replication.SyncReplConfiguration; 096import org.slf4j.Logger; 097import org.slf4j.LoggerFactory; 098import org.slf4j.MDC; 099 100 101/** 102 * Implementation of syncrepl slave a.k.a consumer. 103 * 104 * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a> 105 */ 106public class ReplicationConsumerImpl implements ConnectionClosedEventListener, ReplicationConsumer 107{ 108 /** A dedicated logger for the consumer */ 109 private static final Logger CONSUMER_LOG = LoggerFactory.getLogger( Loggers.CONSUMER_LOG.getName() ); 110 111 /** the syncrepl configuration */ 112 private SyncReplConfiguration config; 113 114 /** the sync cookie sent by the server */ 115 private byte[] syncCookie; 116 117 /** connection to the syncrepl provider */ 118 private LdapNetworkConnection connection; 119 120 /** the search request with control */ 121 private SearchRequest searchRequest; 122 123 /** a reference to the directoryService */ 124 private DirectoryService directoryService; 125 126 /** the schema manager */ 127 private SchemaManager schemaManager; 128 129 /** flag to indicate whether the consumer was disconnected */ 130 private volatile boolean disconnected; 131 132 /** the core session */ 133 private CoreSession session; 134 135 /** attributes on which modification should be ignored */ 136 private static final String[] MOD_IGNORE_AT = new String[] 137 { 138 SchemaConstants.ENTRY_UUID_AT, 139 SchemaConstants.ENTRY_DN_AT, 140 SchemaConstants.CREATE_TIMESTAMP_AT, 141 SchemaConstants.CREATORS_NAME_AT, 142 ApacheSchemaConstants.ENTRY_PARENT_ID_AT, 143 SchemaConstants.COLLECTIVE_ATTRIBUTE_SUBENTRIES_AT, 144 SchemaConstants.CONTEXT_CSN_AT, 145 ApacheSchemaConstants.NB_CHILDREN_AT, 146 ApacheSchemaConstants.NB_SUBORDINATES_AT, 147 SchemaConstants.HAS_SUBORDINATES_AT, 148 SchemaConstants.STRUCTURAL_OBJECT_CLASS_AT, 149 }; 150 151 /** the cookie that was saved last time */ 152 private byte[] lastSavedCookie; 153 154 private volatile boolean reload = false; 155 156 /** The (entrtyUuid=*) filter */ 157 private static final PresenceNode ENTRY_UUID_PRESENCE_FILTER = new PresenceNode( SchemaConstants.ENTRY_UUID_AT ); 158 159 private Modification cookieMod; 160 161 private Modification ridMod; 162 163 /** AttributeTypes used for replication */ 164 private AttributeType adsReplCookieAT; 165 private AttributeType adsDsReplicaIdAT; 166 167 private static final Map<String, Object> UUID_LOCK_MAP = new LRUMap( 1000 ); 168 169 170 /** 171 * @return the config 172 */ 173 @Override 174 public SyncReplConfiguration getConfig() 175 { 176 return config; 177 } 178 179 180 /** 181 * Init the replication service 182 * @param directoryservice The directory service 183 */ 184 @Override 185 public void init( DirectoryService directoryservice ) throws Exception 186 { 187 this.directoryService = directoryservice; 188 189 session = directoryService.getAdminSession(); 190 191 schemaManager = directoryservice.getSchemaManager(); 192 193 adsReplCookieAT = schemaManager.lookupAttributeTypeRegistry( SchemaConstants.ADS_REPL_COOKIE ); 194 adsDsReplicaIdAT = schemaManager.lookupAttributeTypeRegistry( SchemaConstants.ADS_DS_REPLICA_ID ); 195 196 Attribute cookieAttr = new DefaultAttribute( adsReplCookieAT ); 197 cookieMod = new DefaultModification( ModificationOperation.REPLACE_ATTRIBUTE, cookieAttr ); 198 199 Attribute ridAttr = new DefaultAttribute( adsDsReplicaIdAT ); 200 ridMod = new DefaultModification( ModificationOperation.REPLACE_ATTRIBUTE, ridAttr ); 201 202 prepareSyncSearchRequest(); 203 } 204 205 206 /** 207 * Connect to the remote server. Note that a SyncRepl consumer will be connected to only 208 * one remote server 209 * 210 * @return true if the connections have been successful. 211 */ 212 public boolean connect() 213 { 214 String providerHost = config.getRemoteHost(); 215 int port = config.getRemotePort(); 216 217 try 218 { 219 // Create a connection 220 if ( connection == null ) 221 { 222 connection = new LdapNetworkConnection( providerHost, port ); 223 connection.setSchemaManager( schemaManager ); 224 225 if ( config.isUseTls() ) 226 { 227 connection.getConfig().setTrustManagers( config.getTrustManager() ); 228 connection.getConfig().setUseTls( true ); 229 } 230 231 connection.addConnectionClosedEventListener( this ); 232 } 233 234 // Try to connect 235 if ( connection.connect() ) 236 { 237 CONSUMER_LOG.info( "Consumer {} connected to producer {}", config.getReplicaId(), config.getProducer() ); 238 239 // Do a bind 240 try 241 { 242 connection.bind( config.getReplUserDn(), Strings.utf8ToString( config.getReplUserPassword() ) ); 243 disconnected = false; 244 245 return true; 246 } 247 catch ( LdapException le ) 248 { 249 CONSUMER_LOG.warn( "Failed to bind to the producer {} with the given bind Dn {}", 250 config.getProducer(), config.getReplUserDn() ); 251 CONSUMER_LOG.warn( "", le ); 252 disconnected = true; 253 } 254 } 255 else 256 { 257 CONSUMER_LOG.warn( "Consumer {} cannot connect to producer {}", config.getReplicaId(), 258 config.getProducer() ); 259 disconnected = true; 260 261 return false; 262 } 263 } 264 catch ( Exception e ) 265 { 266 CONSUMER_LOG.error( "Failed to connect to the producer {}, cause : {}", config.getProducer(), 267 e.getMessage() ); 268 disconnected = true; 269 } 270 271 return false; 272 } 273 274 275 /** 276 * prepares a SearchRequest for syncing DIT content. 277 * 278 */ 279 private void prepareSyncSearchRequest() throws LdapException 280 { 281 String baseDn = config.getBaseDn(); 282 283 searchRequest = new SearchRequestImpl(); 284 285 searchRequest.setBase( new Dn( baseDn ) ); 286 searchRequest.setFilter( config.getFilter() ); 287 searchRequest.setSizeLimit( config.getSearchSizeLimit() ); 288 searchRequest.setTimeLimit( config.getSearchTimeout() ); 289 290 searchRequest.setDerefAliases( config.getAliasDerefMode() ); 291 searchRequest.setScope( config.getSearchScope() ); 292 searchRequest.setTypesOnly( false ); 293 294 searchRequest.addAttributes( config.getAttributes() ); 295 296 if ( !config.isChaseReferrals() ) 297 { 298 searchRequest.addControl( new ManageDsaITImpl() ); 299 } 300 301 if ( CONSUMER_LOG.isDebugEnabled() ) 302 { 303 MDC.put( "Replica", Integer.toString( config.getReplicaId() ) ); 304 CONSUMER_LOG.debug( "Configuring consumer {}", config ); 305 } 306 } 307 308 309 private ResultCodeEnum handleSearchResultDone( SearchResultDone searchDone ) 310 { 311 CONSUMER_LOG.debug( "///////////////// handleSearchDone //////////////////" ); 312 313 SyncDoneValue ctrl = ( SyncDoneValue ) searchDone.getControls().get( SyncDoneValue.OID ); 314 315 if ( ( ctrl != null ) && ( ctrl.getCookie() != null ) ) 316 { 317 syncCookie = ctrl.getCookie(); 318 CONSUMER_LOG.debug( "assigning cookie from sync done value control: {}", Strings.utf8ToString( syncCookie ) ); 319 storeCookie(); 320 } 321 322 CONSUMER_LOG.debug( "//////////////// END handleSearchDone//////////////////////" ); 323 324 reload = false; 325 326 return searchDone.getLdapResult().getResultCode(); 327 } 328 329 330 private void handleSearchReference( SearchResultReference searchRef ) 331 { 332 // this method won't be called cause the provider will serve the referrals as 333 // normal entry objects due to the usage of ManageDsaITControl in the search request 334 } 335 336 337 /** 338 * Process a SearchResultEntry received from a consumer. We have to handle all the 339 * cases : 340 * - Add 341 * - Modify 342 * - Moddn 343 * - Delete 344 * - Present 345 * @param syncResult 346 */ 347 private void handleSearchResultEntry( SearchResultEntry syncResult ) 348 { 349 CONSUMER_LOG.debug( "------------- starting handleSearchResult ------------" ); 350 351 SyncStateValue syncStateCtrl = ( SyncStateValue ) syncResult.getControl( SyncStateValue.OID ); 352 353 try 354 { 355 Entry remoteEntry = new DefaultEntry( schemaManager, syncResult.getEntry() ); 356 String uuid = remoteEntry.get( directoryService.getAtProvider().getEntryUUID() ).getString(); 357 // lock on UUID to serialize the updates when there are multiple consumers 358 // connected to several producers and to the *same* base/partition 359 Object lock = getLockFor( uuid ); 360 361 synchronized ( lock ) 362 { 363 int rid = -1; 364 365 if ( syncStateCtrl.getCookie() != null ) 366 { 367 syncCookie = syncStateCtrl.getCookie(); 368 rid = LdapProtocolUtils.getReplicaId( Strings.utf8ToString( syncCookie ) ); 369 CONSUMER_LOG.debug( "assigning the cookie from sync state value control: {}", 370 Strings.utf8ToString( syncCookie ) ); 371 } 372 373 SyncStateTypeEnum state = syncStateCtrl.getSyncStateType(); 374 375 // check to avoid conversion of UUID from byte[] to String 376 if ( CONSUMER_LOG.isDebugEnabled() ) 377 { 378 CONSUMER_LOG.debug( "state name {}", state.name() ); 379 CONSUMER_LOG.debug( "entryUUID = {}", Strings.uuidToString( syncStateCtrl.getEntryUUID() ) ); 380 } 381 382 Dn remoteDn = remoteEntry.getDn(); 383 384 switch ( state ) 385 { 386 case ADD: 387 boolean remoteDnExist = false; 388 389 try 390 { 391 remoteDnExist = session.exists( remoteDn ); 392 } 393 catch ( LdapNoSuchObjectException lnsoe ) 394 { 395 CONSUMER_LOG.error( lnsoe.getMessage() ); 396 } 397 398 if ( !remoteDnExist ) 399 { 400 CONSUMER_LOG.debug( "adding entry with dn {}", remoteDn ); 401 CONSUMER_LOG.debug( remoteEntry.toString() ); 402 AddOperationContext addContext = new AddOperationContext( session, remoteEntry ); 403 addContext.setReplEvent( true ); 404 addContext.setRid( rid ); 405 406 OperationManager operationManager = directoryService.getOperationManager(); 407 operationManager.add( addContext ); 408 } 409 else 410 { 411 CONSUMER_LOG.debug( "updating entry in refreshOnly mode {}", remoteDn ); 412 modify( remoteEntry, rid ); 413 } 414 415 break; 416 417 case MODIFY: 418 CONSUMER_LOG.debug( "modifying entry with dn {}", remoteEntry.getDn().getName() ); 419 modify( remoteEntry, rid ); 420 421 break; 422 423 case MODDN: 424 String entryUuid = Strings.uuidToString( syncStateCtrl.getEntryUUID() ); 425 applyModDnOperation( remoteEntry, entryUuid, rid ); 426 427 break; 428 429 case DELETE: 430 CONSUMER_LOG.debug( "deleting entry with dn {}", remoteEntry.getDn().getName() ); 431 432 if ( !session.exists( remoteDn ) ) 433 { 434 CONSUMER_LOG 435 .debug( 436 "looks like entry {} was already deleted in a prior update (possibly from another provider), skipping delete", 437 remoteDn ); 438 } 439 else 440 { 441 // incase of a MODDN operation resulting in a branch to be moved out of scope 442 // ApacheDS replication provider sends a single delete event on the Dn of the moved branch 443 // so the branch needs to be recursively deleted here 444 deleteRecursive( remoteEntry.getDn(), rid ); 445 } 446 447 break; 448 449 case PRESENT: 450 CONSUMER_LOG.debug( "entry present {}", remoteEntry ); 451 break; 452 453 default: 454 throw new IllegalArgumentException( "Unexpected sync state " + state ); 455 } 456 457 // store the cookie only if the above operation was successful 458 if ( syncStateCtrl.getCookie() != null ) 459 { 460 storeCookie(); 461 } 462 } 463 } 464 catch ( Exception e ) 465 { 466 CONSUMER_LOG.error( e.getMessage(), e ); 467 } 468 469 CONSUMER_LOG.debug( "------------- Ending handleSearchResult ------------" ); 470 } 471 472 473 /** 474 * {@inheritDoc} 475 */ 476 private void handleSyncInfo( IntermediateResponse syncInfoResp ) 477 { 478 try 479 { 480 CONSUMER_LOG.debug( "............... inside handleSyncInfo ..............." ); 481 482 byte[] syncInfoBytes = syncInfoResp.getResponseValue(); 483 484 if ( syncInfoBytes == null ) 485 { 486 return; 487 } 488 489 SyncInfoValue syncInfoValue = new SyncInfoValueImpl(); 490 491 byte[] cookie = syncInfoValue.getCookie(); 492 493 if ( CONSUMER_LOG.isDebugEnabled() ) 494 { 495 CONSUMER_LOG.debug( "Received a SyncInfoValue from producer {} : {}", config.getProducer(), 496 syncInfoValue ); 497 } 498 499 int replicaId = -1; 500 501 if ( cookie != null ) 502 { 503 if ( CONSUMER_LOG.isDebugEnabled() ) 504 { 505 CONSUMER_LOG.debug( "setting the cookie from the sync info: {}", Strings.utf8ToString( cookie ) ); 506 CONSUMER_LOG.debug( "setting the cookie from the sync info: {}", Strings.utf8ToString( cookie ) ); 507 } 508 509 syncCookie = cookie; 510 511 String cookieString = Strings.utf8ToString( syncCookie ); 512 replicaId = LdapProtocolUtils.getReplicaId( cookieString ); 513 } 514 515 CONSUMER_LOG.info( "refreshDeletes: {}", syncInfoValue.isRefreshDeletes() ); 516 517 List<byte[]> uuidList = syncInfoValue.getSyncUUIDs(); 518 519 // if refreshDeletes set to true then delete all the entries with entryUUID 520 // present in the syncIdSet 521 if ( syncInfoValue.isRefreshDeletes() ) 522 { 523 deleteEntries( uuidList, false, replicaId ); 524 } 525 else 526 { 527 deleteEntries( uuidList, true, replicaId ); 528 } 529 530 CONSUMER_LOG.info( "refreshDone: {}", syncInfoValue.isRefreshDone() ); 531 532 storeCookie(); 533 } 534 catch ( Exception de ) 535 { 536 CONSUMER_LOG.error( "Failed to handle syncinfo message", de ); 537 } 538 539 CONSUMER_LOG.debug( ".................... END handleSyncInfo ..............." ); 540 } 541 542 543 /** 544 * {@inheritDoc} 545 */ 546 @Override 547 public void connectionClosed() 548 { 549 if ( CONSUMER_LOG.isDebugEnabled() ) 550 { 551 MDC.put( "Replica", Integer.toString( config.getReplicaId() ) ); 552 CONSUMER_LOG.debug( "Consumer {} session with {} has been closed ", config.getReplicaId(), 553 config.getProducer() ); 554 } 555 556 disconnect(); 557 } 558 559 560 /** 561 * Starts the synchronization operation 562 */ 563 @Override 564 public ReplicationStatusEnum startSync() 565 { 566 CONSUMER_LOG.debug( "Starting the SyncRepl process for consumer {}", config.getReplicaId() ); 567 568 // read the cookie if persisted 569 readCookie(); 570 571 if ( config.isRefreshNPersist() ) 572 { 573 try 574 { 575 CONSUMER_LOG.debug( "==================== Refresh And Persist ==========" ); 576 577 return doSyncSearch( SynchronizationModeEnum.REFRESH_AND_PERSIST, reload ); 578 } 579 catch ( Exception e ) 580 { 581 CONSUMER_LOG.error( "Failed to sync with refreshAndPersist mode", e ); 582 return ReplicationStatusEnum.DISCONNECTED; 583 } 584 } 585 else 586 { 587 return doRefreshOnly(); 588 } 589 } 590 591 592 private ReplicationStatusEnum doRefreshOnly() 593 { 594 while ( !disconnected ) 595 { 596 CONSUMER_LOG.debug( "==================== Refresh Only ==========" ); 597 598 try 599 { 600 doSyncSearch( SynchronizationModeEnum.REFRESH_ONLY, reload ); 601 602 CONSUMER_LOG.debug( "--------------------- Sleep for {} seconds ------------------", 603 ( config.getRefreshInterval() / 1000 ) ); 604 Thread.sleep( config.getRefreshInterval() ); 605 CONSUMER_LOG.debug( "--------------------- syncing again ------------------" ); 606 607 } 608 catch ( InterruptedException ie ) 609 { 610 CONSUMER_LOG.warn( "refresher thread interrupted" ); 611 612 return ReplicationStatusEnum.DISCONNECTED; 613 } 614 catch ( Exception e ) 615 { 616 CONSUMER_LOG.error( "Failed to sync with refresh only mode", e ); 617 return ReplicationStatusEnum.DISCONNECTED; 618 } 619 } 620 621 return ReplicationStatusEnum.STOPPED; 622 } 623 624 625 /** 626 * {@inheritDoc} 627 */ 628 @Override 629 public void setConfig( ReplicationConsumerConfig config ) 630 { 631 this.config = ( SyncReplConfiguration ) config; 632 } 633 634 635 /** 636 * {@inheritDoc} 637 */ 638 @Override 639 public boolean connect( boolean now ) 640 { 641 boolean connected = false; 642 643 if ( now ) 644 { 645 connected = connect(); 646 } 647 648 while ( !connected ) 649 { 650 try 651 { 652 CONSUMER_LOG.debug( "Consumer {} cannot connect to {}, wait 5 seconds.", config.getReplicaId(), 653 config.getProducer() ); 654 655 // try to establish a connection for every 5 seconds 656 Thread.sleep( 5000 ); 657 } 658 catch ( InterruptedException e ) 659 { 660 CONSUMER_LOG.warn( "Consumer {} Interrupted while trying to reconnect to the provider {}", 661 config.getReplicaId(), config.getProducer() ); 662 } 663 664 connected = connect(); 665 } 666 667 // TODO : we may have cases were we get here with the connected flag to false. With the above 668 // code, thi sis not possible 669 670 return connected; 671 } 672 673 674 /** 675 * {@inheritDoc} 676 */ 677 @Override 678 public void ping() 679 { 680 boolean connected = !disconnected; 681 682 boolean restartSync = false; 683 684 if ( disconnected ) 685 { 686 connected = connect(); 687 restartSync = connected; 688 } 689 690 if ( connected ) 691 { 692 CONSUMER_LOG.debug( "PING : The consumer {} is alive", config.getReplicaId() ); 693 694 // DIRSERVER-2014 695 if ( restartSync ) 696 { 697 CONSUMER_LOG.warn( "Restarting the disconnected consumer {}", config.getReplicaId() ); 698 disconnected = false; 699 startSync(); 700 } 701 } 702 else 703 { 704 CONSUMER_LOG.debug( "PING : The consumer {} cannot be connected", config.getReplicaId() ); 705 } 706 } 707 708 709 /** 710 * {@inheritDoc} 711 */ 712 @Override 713 public void stop() 714 { 715 if ( !disconnected ) 716 { 717 disconnect(); 718 } 719 } 720 721 722 /** 723 * {@inheritDoc} 724 */ 725 @Override 726 public String getId() 727 { 728 return String.valueOf( getConfig().getReplicaId() ); 729 } 730 731 732 /** 733 * Performs a search on connection with updated syncRequest control. The provider 734 * will initiate an UpdateContant or an initContent depending on the current consumer 735 * status, accordingly to the cookie's content. 736 * If the mode is refreshOnly, the server will send a SearchResultDone when all the modified 737 * entries have been sent. 738 * If the mode is refreshAndPersist, the provider never send a SearchResultDone, so we keep 739 * receiving modifications' notifications on the consumer, and never exit the loop, unless 740 * some communication error occurs. 741 * 742 * @param syncType The synchornization type, either REFRESH_ONLY or REFRESH_AND_PERSIST 743 * @param reloadHint A flag used to tell the server that we want a reload 744 * @return The replication status 745 * @throws Exception in case of any problems encountered while searching 746 */ 747 private ReplicationStatusEnum doSyncSearch( SynchronizationModeEnum syncType, boolean reloadHint ) throws Exception 748 { 749 CONSUMER_LOG.debug( "Starting synchronization mode {}, reloadHint {}", syncType, reloadHint ); 750 // Prepare the Syncrepl Request 751 SyncRequestValue syncReq = new SyncRequestValueImpl(); 752 753 syncReq.setMode( syncType ); 754 syncReq.setReloadHint( reloadHint ); 755 756 // If we have a persisted cookie, send it. 757 if ( syncCookie != null ) 758 { 759 CONSUMER_LOG.debug( "searching on {} with searchRequest, cookie '{}'", config.getProducer(), 760 Strings.utf8ToString( syncCookie ) ); 761 syncReq.setCookie( syncCookie ); 762 } 763 else 764 { 765 CONSUMER_LOG.debug( "searching on {} with searchRequest, no cookie", config.getProducer() ); 766 } 767 768 searchRequest.addControl( syncReq ); 769 770 // Do the search. We use a searchAsync because we want to get SearchResultDone responses 771 SearchFuture sf = connection.searchAsync( searchRequest ); 772 773 Response resp = sf.get(); 774 775 CONSUMER_LOG.debug( "Response from {} : {}", config.getProducer(), resp ); 776 777 // Now, process the responses. We loop until we have a connection termination or 778 // a SearchResultDone (RefreshOnly mode) 779 while ( !( resp instanceof SearchResultDone ) && !sf.isCancelled() && !disconnected ) 780 { 781 if ( resp instanceof SearchResultEntry ) 782 { 783 SearchResultEntry result = ( SearchResultEntry ) resp; 784 785 handleSearchResultEntry( result ); 786 } 787 else if ( resp instanceof SearchResultReference ) 788 { 789 handleSearchReference( ( SearchResultReference ) resp ); 790 } 791 else if ( resp instanceof IntermediateResponse ) 792 { 793 handleSyncInfo( ( IntermediateResponse ) resp ); 794 } 795 796 // Next entry 797 resp = sf.get(); 798 CONSUMER_LOG.debug( "Response from {} : {}", config.getProducer(), resp ); 799 } 800 801 if ( sf.isCancelled() ) 802 { 803 804 CONSUMER_LOG.debug( "Search sync on {} has been canceled ", config.getProducer(), sf.getCause() ); 805 806 return ReplicationStatusEnum.DISCONNECTED; 807 } 808 else if ( disconnected ) 809 { 810 CONSUMER_LOG.debug( "Disconnected from {}", config.getProducer() ); 811 812 return ReplicationStatusEnum.DISCONNECTED; 813 } 814 else 815 { 816 ResultCodeEnum resultCode = handleSearchResultDone( ( SearchResultDone ) resp ); 817 818 CONSUMER_LOG.debug( "Rsultcode of Sync operation from {} : {}", config.getProducer(), resultCode ); 819 820 if ( resultCode == ResultCodeEnum.NO_SUCH_OBJECT ) 821 { 822 // log the error and handle it appropriately 823 CONSUMER_LOG.warn( "The base Dn {} is not found on provider {}", config.getBaseDn(), 824 config.getProducer() ); 825 826 CONSUMER_LOG.warn( "Disconnecting the Refresh&Persist consumer from provider {}", config.getProducer() ); 827 disconnect(); 828 829 return ReplicationStatusEnum.DISCONNECTED; 830 } 831 else if ( resultCode == ResultCodeEnum.E_SYNC_REFRESH_REQUIRED ) 832 { 833 CONSUMER_LOG.warn( "Full SYNC_REFRESH required from {}", config.getProducer() ); 834 835 reload = true; 836 837 try 838 { 839 CONSUMER_LOG.debug( "Deleting baseDN {}", config.getBaseDn() ); 840 841 // FIXME taking a backup right before deleting might be a good thing, just to be safe. 842 // the backup file can be deleted after reload completes successfully 843 844 // the 'rid' value is not taken into consideration when 'reload' is set 845 // so any dummy value is fine 846 deleteRecursive( new Dn( config.getBaseDn() ), -1000 ); 847 } 848 catch ( Exception e ) 849 { 850 CONSUMER_LOG 851 .error( 852 "Failed to delete the replica base as part of handling E_SYNC_REFRESH_REQUIRED, disconnecting the consumer", 853 e ); 854 } 855 856 // Do a full update. 857 removeCookie(); 858 859 CONSUMER_LOG.debug( "Re-doing a syncRefresh from producer {}", config.getProducer() ); 860 861 return ReplicationStatusEnum.REFRESH_REQUIRED; 862 } 863 else 864 { 865 CONSUMER_LOG.debug( "Got result code {} from producer {}. Replication stopped", resultCode, 866 config.getProducer() ); 867 return ReplicationStatusEnum.DISCONNECTED; 868 } 869 } 870 } 871 872 873 /** 874 * Disconnect from the producer 875 */ 876 private void disconnect() 877 { 878 disconnected = true; 879 880 try 881 { 882 if ( ( connection != null ) && connection.isConnected() ) 883 { 884 connection.unBind(); 885 CONSUMER_LOG.info( "Unbound from the server {}", config.getProducer() ); 886 887 if ( CONSUMER_LOG.isDebugEnabled() ) 888 { 889 MDC.put( "Replica", Integer.toString( config.getReplicaId() ) ); 890 CONSUMER_LOG.info( "Unbound from the server {}", config.getProducer() ); 891 } 892 893 connection.close(); 894 CONSUMER_LOG.info( "Connection closed for the server {}", config.getProducer() ); 895 896 connection = null; 897 } 898 } 899 catch ( Exception e ) 900 { 901 CONSUMER_LOG.error( "Failed to close the connection", e ); 902 } 903 finally 904 { 905 // persist the cookie 906 storeCookie(); 907 908 // reset the cookie 909 syncCookie = null; 910 } 911 } 912 913 914 /** 915 * stores the cookie. 916 */ 917 private void storeCookie() 918 { 919 CONSUMER_LOG.debug( "Storing the cookie '{}'", Strings.utf8ToString( syncCookie ) ); 920 921 if ( syncCookie == null ) 922 { 923 return; 924 } 925 926 if ( ( lastSavedCookie != null ) && Arrays.equals( syncCookie, lastSavedCookie ) ) 927 { 928 return; 929 } 930 931 try 932 { 933 Attribute attr = cookieMod.getAttribute(); 934 attr.clear(); 935 attr.add( syncCookie ); 936 937 String cookieString = Strings.utf8ToString( syncCookie ); 938 int replicaId = LdapProtocolUtils.getReplicaId( cookieString ); 939 940 Attribute ridAt = ridMod.getAttribute(); 941 ridAt.clear(); 942 ridAt.add( String.valueOf( replicaId ) ); 943 944 CONSUMER_LOG.debug( "Storing the cookie in the DIT : {}", config.getConfigEntryDn() ); 945 946 session.modify( config.getConfigEntryDn(), cookieMod ); 947 CONSUMER_LOG.debug( "stored the cookie in entry {}", config.getConfigEntryDn() ); 948 949 lastSavedCookie = new byte[syncCookie.length]; 950 System.arraycopy( syncCookie, 0, lastSavedCookie, 0, syncCookie.length ); 951 } 952 catch ( Exception e ) 953 { 954 CONSUMER_LOG.error( "Failed to store the cookie in consumer entry {}", config.getConfigEntryDn(), e ); 955 } 956 } 957 958 959 /** 960 * Read the cookie for a consumer 961 */ 962 private void readCookie() 963 { 964 try 965 { 966 Entry entry = session.lookup( config.getConfigEntryDn(), SchemaConstants.ADS_REPL_COOKIE ); 967 968 if ( entry != null ) 969 { 970 Attribute attr = entry.get( adsReplCookieAT ); 971 972 if ( attr != null ) 973 { 974 syncCookie = attr.getBytes(); 975 lastSavedCookie = syncCookie; 976 String syncCookieString = Strings.utf8ToString( syncCookie ); 977 CONSUMER_LOG.debug( "Loaded cookie {} for consumer {}", syncCookieString, config.getReplicaId() ); 978 } 979 else 980 { 981 CONSUMER_LOG.debug( "No cookie found for consumer {}", config.getReplicaId() ); 982 } 983 } 984 else 985 { 986 CONSUMER_LOG.debug( "Cannot find the configuration '{}' in the DIT for consumer {}", 987 config.getConfigEntryDn(), config.getReplicaId() ); 988 } 989 } 990 catch ( Exception e ) 991 { 992 // can be ignored, most likely happens if there is no entry with the given Dn 993 // log in debug mode 994 CONSUMER_LOG.debug( "Failed to read the cookie, cannot find the entry '{}' in the DIT for consumer {}", 995 config.getConfigEntryDn(), 996 config.getReplicaId() ); 997 } 998 } 999 1000 1001 /** 1002 * deletes the cookie and resets the syncCookie to null 1003 */ 1004 private void removeCookie() 1005 { 1006 try 1007 { 1008 Attribute cookieAttr = new DefaultAttribute( adsReplCookieAT ); 1009 Modification deleteCookieMod = new DefaultModification( ModificationOperation.REMOVE_ATTRIBUTE, 1010 cookieAttr ); 1011 session.modify( config.getConfigEntryDn(), deleteCookieMod ); 1012 CONSUMER_LOG.info( "resetting sync cookie of the consumer with config entry Dn {}", 1013 config.getConfigEntryDn() ); 1014 } 1015 catch ( Exception e ) 1016 { 1017 CONSUMER_LOG.warn( "Failed to delete the cookie from the consumer with config entry Dn {}", 1018 config.getConfigEntryDn() ); 1019 CONSUMER_LOG.warn( "{}", e ); 1020 } 1021 1022 syncCookie = null; 1023 lastSavedCookie = null; 1024 } 1025 1026 1027 private void applyModDnOperation( Entry remoteEntry, String entryUuid, int rid ) throws Exception 1028 { 1029 CONSUMER_LOG.debug( "MODDN for entry {}, new entry : {}", entryUuid, remoteEntry ); 1030 1031 // Retrieve locally the moved or renamed entry 1032 String filter = "(entryUuid=" + entryUuid + ")"; 1033 SearchRequest searchRequest = new SearchRequestImpl(); 1034 searchRequest.setBase( new Dn( schemaManager, config.getBaseDn() ) ); 1035 searchRequest.setFilter( filter ); 1036 searchRequest.setScope( SearchScope.SUBTREE ); 1037 searchRequest.addAttributes( SchemaConstants.ENTRY_UUID_AT, SchemaConstants.ENTRY_CSN_AT, 1038 SchemaConstants.ALL_USER_ATTRIBUTES ); 1039 1040 Cursor<Entry> cursor = session.search( searchRequest ); 1041 cursor.beforeFirst(); 1042 1043 Entry localEntry = null; 1044 1045 if ( cursor.next() ) 1046 { 1047 localEntry = cursor.get(); 1048 } 1049 1050 cursor.close(); 1051 1052 // can happen in MMR scenario 1053 if ( localEntry == null ) 1054 { 1055 return; 1056 } 1057 1058 if ( config.isMmrMode() ) 1059 { 1060 Csn localCsn = new Csn( localEntry.get( SchemaConstants.ENTRY_CSN_AT ).getString() ); 1061 Csn remoteCsn = new Csn( remoteEntry.get( SchemaConstants.ENTRY_CSN_AT ).getString() ); 1062 1063 if ( localCsn.compareTo( remoteCsn ) >= 0 ) 1064 { 1065 // just discard the received modified entry, that is old 1066 CONSUMER_LOG.debug( "local modification is latest, discarding the modDn operation dn {}", 1067 remoteEntry.getDn() ); 1068 return; 1069 } 1070 } 1071 1072 // Compute the DN, parentDn and Rdn for both entries 1073 Dn localDn = localEntry.getDn(); 1074 Dn remoteDn = directoryService.getDnFactory().create( remoteEntry.getDn().getName() ); 1075 1076 Dn localParentDn = localDn.getParent(); 1077 Dn remoteParentDn = directoryService.getDnFactory().create( remoteDn.getParent().getName() ); 1078 1079 Rdn localRdn = localDn.getRdn(); 1080 Rdn remoteRdn = directoryService.getDnFactory().create( remoteDn.getRdn().getName() ).getRdn(); 1081 1082 // Check if the OldRdn has been deleted 1083 boolean deleteOldRdn = !remoteEntry.contains( localRdn.getNormType(), localRdn.getValue() ); 1084 1085 if ( localRdn.equals( remoteRdn ) ) 1086 { 1087 // If the RDN are equals, it's a MOVE 1088 CONSUMER_LOG.debug( "moving {} to the new parent {}", localDn, remoteParentDn ); 1089 MoveOperationContext movCtx = new MoveOperationContext( session, localDn, remoteParentDn ); 1090 movCtx.setReplEvent( true ); 1091 movCtx.setRid( rid ); 1092 directoryService.getOperationManager().move( movCtx ); 1093 } 1094 else if ( localParentDn.equals( remoteParentDn ) ) 1095 { 1096 // If the parentDn are equals, it's a RENAME 1097 CONSUMER_LOG.debug( "renaming the Dn {} with new Rdn {} and deleteOldRdn flag set to {}", 1098 localDn.getName(), remoteRdn.getName(), String.valueOf( deleteOldRdn ) ); 1099 1100 RenameOperationContext renCtx = new RenameOperationContext( session, localDn, remoteRdn, 1101 deleteOldRdn ); 1102 renCtx.setReplEvent( true ); 1103 renCtx.setRid( rid ); 1104 directoryService.getOperationManager().rename( renCtx ); 1105 } 1106 else 1107 { 1108 // Otherwise, it's a MOVE and RENAME 1109 CONSUMER_LOG.debug( 1110 "moveAndRename on the Dn {} with new newParent Dn {}, new Rdn {} and deleteOldRdn flag set to {}", 1111 localDn.getName(), 1112 remoteParentDn.getName(), 1113 remoteRdn.getName(), 1114 String.valueOf( deleteOldRdn ) ); 1115 1116 MoveAndRenameOperationContext movRenCtx = new MoveAndRenameOperationContext( session, localDn, 1117 remoteParentDn, remoteRdn, deleteOldRdn ); 1118 movRenCtx.setReplEvent( true ); 1119 movRenCtx.setRid( rid ); 1120 directoryService.getOperationManager().moveAndRename( movRenCtx ); 1121 } 1122 } 1123 1124 1125 private void modify( Entry remoteEntry, int rid ) throws Exception 1126 { 1127 String[] attributes = computeAttributes( config.getAttributes(), SchemaConstants.ALL_OPERATIONAL_ATTRIBUTES ); 1128 1129 LookupOperationContext lookupCtx = 1130 new LookupOperationContext( session, remoteEntry.getDn(), attributes ); 1131 1132 lookupCtx.setSyncreplLookup( true ); 1133 1134 Entry localEntry; 1135 1136 Partition partition = session.getDirectoryService().getPartitionNexus().getPartition( remoteEntry.getDn() ); 1137 1138 try ( PartitionTxn partitionTxn = partition.beginReadTransaction() ) 1139 { 1140 lookupCtx.setTransaction( partitionTxn ); 1141 localEntry = session.getDirectoryService().getOperationManager().lookup( lookupCtx ); 1142 } 1143 1144 if ( config.isMmrMode() ) 1145 { 1146 Csn localCsn = new Csn( localEntry.get( SchemaConstants.ENTRY_CSN_AT ).getString() ); 1147 Csn remoteCsn = new Csn( remoteEntry.get( SchemaConstants.ENTRY_CSN_AT ).getString() ); 1148 1149 if ( localCsn.compareTo( remoteCsn ) >= 0 ) 1150 { 1151 // just discard the received modified entry, that is old 1152 CONSUMER_LOG.debug( "local modification is latest, discarding the modification of dn {}", 1153 remoteEntry.getDn() ); 1154 return; 1155 } 1156 } 1157 1158 remoteEntry.removeAttributes( MOD_IGNORE_AT ); 1159 localEntry.removeAttributes( MOD_IGNORE_AT ); 1160 1161 List<Modification> mods = new ArrayList<>(); 1162 Iterator<Attribute> itr = localEntry.iterator(); 1163 1164 while ( itr.hasNext() ) 1165 { 1166 Attribute localAttr = itr.next(); 1167 String attrId = localAttr.getId(); 1168 Modification mod; 1169 Attribute remoteAttr = remoteEntry.get( attrId ); 1170 1171 if ( remoteAttr != null ) // would be better if we compare the values also? or will it consume more time? 1172 { 1173 mod = new DefaultModification( ModificationOperation.REPLACE_ATTRIBUTE, remoteAttr ); 1174 remoteEntry.remove( remoteAttr ); 1175 } 1176 else 1177 { 1178 mod = new DefaultModification( ModificationOperation.REMOVE_ATTRIBUTE, localAttr ); 1179 } 1180 1181 mods.add( mod ); 1182 } 1183 1184 if ( remoteEntry.size() > 0 ) 1185 { 1186 itr = remoteEntry.iterator(); 1187 1188 while ( itr.hasNext() ) 1189 { 1190 mods.add( new DefaultModification( ModificationOperation.ADD_ATTRIBUTE, itr.next() ) ); 1191 } 1192 } 1193 1194 List<Modification> serverModifications = new ArrayList<>( mods.size() ); 1195 1196 for ( Modification mod : mods ) 1197 { 1198 serverModifications.add( new DefaultModification( directoryService.getSchemaManager(), mod ) ); 1199 } 1200 1201 ModifyOperationContext modifyContext = new ModifyOperationContext( session, remoteEntry.getDn(), 1202 serverModifications ); 1203 modifyContext.setReplEvent( true ); 1204 modifyContext.setRid( rid ); 1205 1206 OperationManager operationManager = directoryService.getOperationManager(); 1207 operationManager.modify( modifyContext ); 1208 } 1209 1210 1211 /** 1212 * Create a new list combining a list and a newly added attribute 1213 */ 1214 private String[] computeAttributes( String[] attributes, String addedAttribute ) 1215 { 1216 if ( attributes != null ) 1217 { 1218 if ( addedAttribute != null ) 1219 { 1220 String[] combinedAttributes = new String[attributes.length + 1]; 1221 1222 System.arraycopy( attributes, 0, combinedAttributes, 0, attributes.length ); 1223 combinedAttributes[attributes.length] = addedAttribute; 1224 1225 return combinedAttributes; 1226 } 1227 else 1228 { 1229 return attributes; 1230 } 1231 } 1232 else 1233 { 1234 if ( addedAttribute != null ) 1235 { 1236 return new String[] 1237 { addedAttribute }; 1238 } 1239 else 1240 { 1241 return StringConstants.EMPTY_STRINGS; 1242 } 1243 } 1244 } 1245 1246 1247 /** 1248 * deletes the entries having the UUID given in the list 1249 * 1250 * @param uuidList the list of UUIDs 1251 * @param replicaId TODO 1252 * @throws Exception in case of any problems while deleting the entries 1253 */ 1254 private void deleteEntries( List<byte[]> uuidList, boolean isRefreshPresent, int replicaId ) throws Exception 1255 { 1256 if ( uuidList == null || uuidList.isEmpty() ) 1257 { 1258 return; 1259 } 1260 1261 // if it is refreshPresent list then send all the UUIDs for 1262 // filtering, otherwise breaking the list will cause the 1263 // other present entries to be deleted from DIT 1264 if ( isRefreshPresent ) 1265 { 1266 CONSUMER_LOG.debug( "refresh present syncinfo list has {} UUIDs", uuidList.size() ); 1267 processDelete( uuidList, isRefreshPresent, replicaId ); 1268 return; 1269 } 1270 1271 int nodeLimit = 10; 1272 1273 int count = uuidList.size() / nodeLimit; 1274 1275 int startIndex = 0; 1276 int i = 0; 1277 for ( ; i < count; i++ ) 1278 { 1279 startIndex = i * nodeLimit; 1280 processDelete( uuidList.subList( startIndex, startIndex + nodeLimit ), isRefreshPresent, replicaId ); 1281 } 1282 1283 if ( ( uuidList.size() % nodeLimit ) != 0 ) 1284 { 1285 // remove the remaining entries 1286 if ( count > 0 ) 1287 { 1288 startIndex = i * nodeLimit; 1289 } 1290 1291 processDelete( uuidList.subList( startIndex, uuidList.size() ), isRefreshPresent, replicaId ); 1292 } 1293 } 1294 1295 1296 /** 1297 * do not call this method directly, instead call deleteEntries() 1298 * 1299 * @param limitedUuidList a list of UUIDs whose size is less than or equal to #NODE_LIMIT (node limit applies only for refreshDeletes list) 1300 * @param isRefreshPresent a flag indicating the type of entries present in the UUID list 1301 * @param replicaId TODO 1302 */ 1303 private void processDelete( List<byte[]> limitedUuidList, boolean isRefreshPresent, int replicaId ) 1304 throws Exception 1305 { 1306 ExprNode filter = null; 1307 int size = limitedUuidList.size(); 1308 if ( size == 1 ) 1309 { 1310 String uuid = Strings.uuidToString( limitedUuidList.get( 0 ) ); 1311 1312 filter = new EqualityNode<String>( SchemaConstants.ENTRY_UUID_AT, uuid ); 1313 if ( isRefreshPresent ) 1314 { 1315 filter = new NotNode( filter ); 1316 } 1317 } 1318 else 1319 { 1320 if ( isRefreshPresent ) 1321 { 1322 filter = new AndNode(); 1323 } 1324 else 1325 { 1326 filter = new OrNode(); 1327 } 1328 1329 for ( int i = 0; i < size; i++ ) 1330 { 1331 String uuid = Strings.uuidToString( limitedUuidList.get( i ) ); 1332 ExprNode uuidEqNode = new EqualityNode<String>( SchemaConstants.ENTRY_UUID_AT, uuid ); 1333 1334 if ( isRefreshPresent ) 1335 { 1336 uuidEqNode = new NotNode( uuidEqNode ); 1337 ( ( AndNode ) filter ).addNode( uuidEqNode ); 1338 } 1339 else 1340 { 1341 ( ( OrNode ) filter ).addNode( uuidEqNode ); 1342 } 1343 } 1344 } 1345 1346 Dn dn = new Dn( schemaManager, config.getBaseDn() ); 1347 1348 if ( CONSUMER_LOG.isDebugEnabled() ) 1349 { 1350 CONSUMER_LOG.debug( "selecting entries to be deleted using filter {}", filter ); 1351 } 1352 1353 SearchRequest req = new SearchRequestImpl(); 1354 req.setBase( dn ); 1355 req.setFilter( filter ); 1356 req.setScope( SearchScope.SUBTREE ); 1357 req.setDerefAliases( AliasDerefMode.NEVER_DEREF_ALIASES ); 1358 // the ENTRY_DN_AT must be in the attribute list, otherwise sorting fails 1359 req.addAttributes( SchemaConstants.ENTRY_DN_AT ); 1360 1361 SortKey sk = new SortKey( SchemaConstants.ENTRY_DN_AT, SchemaConstants.DISTINGUISHED_NAME_MATCH_MR_OID ); 1362 SortRequest ctrl = new SortRequestImpl(); 1363 ctrl.addSortKey( sk ); 1364 req.addControl( ctrl ); 1365 1366 OperationManager operationManager = directoryService.getOperationManager(); 1367 1368 Cursor<Entry> cursor = session.search( req ); 1369 cursor.beforeFirst(); 1370 1371 while ( cursor.next() ) 1372 { 1373 Entry entry = cursor.get(); 1374 1375 DeleteOperationContext ctx = new DeleteOperationContext( session ); 1376 ctx.setReplEvent( true ); 1377 ctx.setRid( replicaId ); 1378 1379 // DO NOT generate replication event if this is being deleted as part of 1380 // e_sync_refresh_required 1381 if ( reload ) 1382 { 1383 ctx.setGenerateNoReplEvt( true ); 1384 } 1385 1386 ctx.setDn( entry.getDn() ); 1387 operationManager.delete( ctx ); 1388 } 1389 1390 cursor.close(); 1391 } 1392 1393 1394 private synchronized Object getLockFor( String uuid ) 1395 { 1396 Object lock = UUID_LOCK_MAP.get( uuid ); 1397 1398 if ( lock == null ) 1399 { 1400 lock = new Object(); 1401 UUID_LOCK_MAP.put( uuid, lock ); 1402 } 1403 1404 return lock; 1405 } 1406 1407 1408 /** 1409 * removes all child entries present under the given Dn and finally the Dn itself 1410 * 1411 * @param rootDn the Dn which will be removed after removing its children 1412 * @param rid the replica ID 1413 * @throws Exception If the Dn is not valid or if the deletion failed 1414 */ 1415 private void deleteRecursive( Dn rootDn, int rid ) throws Exception 1416 { 1417 CONSUMER_LOG.debug( "searching for Dn {} and its children before deleting", rootDn.getName() ); 1418 Cursor<Entry> cursor = null; 1419 1420 try 1421 { 1422 SearchRequest req = new SearchRequestImpl(); 1423 req.setBase( rootDn ); 1424 req.setFilter( ENTRY_UUID_PRESENCE_FILTER ); 1425 req.setScope( SearchScope.SUBTREE ); 1426 req.setDerefAliases( AliasDerefMode.NEVER_DEREF_ALIASES ); 1427 // the ENTRY_DN_AT must be in the attribute list, otherwise sorting fails 1428 req.addAttributes( SchemaConstants.ENTRY_DN_AT ); 1429 1430 SortKey sk = new SortKey( SchemaConstants.ENTRY_DN_AT, SchemaConstants.DISTINGUISHED_NAME_MATCH_MR_OID ); 1431 1432 SortRequest ctrl = new SortRequestImpl(); 1433 ctrl.addSortKey( sk ); 1434 req.addControl( ctrl ); 1435 1436 cursor = session.search( req ); 1437 cursor.beforeFirst(); 1438 1439 OperationManager operationManager = directoryService.getOperationManager(); 1440 1441 while ( cursor.next() ) 1442 { 1443 Entry e = cursor.get(); 1444 1445 DeleteOperationContext ctx = new DeleteOperationContext( session ); 1446 ctx.setReplEvent( true ); 1447 ctx.setRid( rid ); 1448 1449 // DO NOT generate replication event if this is being deleted as part of 1450 // e_sync_refresh_required 1451 if ( reload ) 1452 { 1453 ctx.setGenerateNoReplEvt( true ); 1454 } 1455 1456 ctx.setDn( e.getDn() ); 1457 1458 operationManager.delete( ctx ); 1459 } 1460 } 1461 catch ( Exception e ) 1462 { 1463 String msg = "Failed to delete the Dn " + rootDn.getName() + " and its children (if any present)"; 1464 CONSUMER_LOG.error( msg, e ); 1465 throw e; 1466 } 1467 finally 1468 { 1469 if ( cursor != null ) 1470 { 1471 cursor.close(); 1472 } 1473 } 1474 } 1475 1476 1477 /** 1478 * @see Object#toString() 1479 */ 1480 @Override 1481 public String toString() 1482 { 1483 StringBuilder sb = new StringBuilder(); 1484 1485 sb.append( "Consumer " ).append( config ); 1486 1487 return sb.toString(); 1488 } 1489}