001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 * 019 */ 020 021package org.apache.directory.server.ldap.replication.provider; 022 023 024import java.io.File; 025import java.io.IOException; 026 027import jdbm.RecordManager; 028import jdbm.recman.BaseRecordManager; 029import jdbm.recman.TransactionManager; 030 031import org.apache.directory.api.ldap.model.constants.Loggers; 032import org.apache.directory.api.ldap.model.constants.SchemaConstants; 033import org.apache.directory.api.ldap.model.exception.LdapException; 034import org.apache.directory.api.ldap.model.name.Dn; 035import org.apache.directory.api.ldap.model.schema.SchemaManager; 036import org.apache.directory.api.ldap.model.schema.comparators.SerializableComparator; 037import org.apache.directory.server.core.api.DirectoryService; 038import org.apache.directory.server.core.api.event.EventType; 039import org.apache.directory.server.core.api.event.NotificationCriteria; 040import org.apache.directory.server.core.api.partition.PartitionTxn; 041import org.apache.directory.server.core.partition.impl.btree.jdbm.JdbmTable; 042import org.apache.directory.server.core.partition.impl.btree.jdbm.StringSerializer; 043import org.apache.directory.server.ldap.replication.ReplicaEventMessage; 044import org.apache.directory.server.ldap.replication.ReplicaEventMessageSerializer; 045import org.slf4j.Logger; 046import org.slf4j.LoggerFactory; 047 048 049/** 050 * A structure storing the configuration on each consumer registered on a producer. It stores 051 * the following informations : 052 * <ul> 053 * <li>replicaId : the internal ID associated with the consumer on the provider</li> 054 * <li>hostname : the consumer's host</li> 055 * <li>searchFilter : the filter</li> 056 * <li>lastSentCsn : the last CSN sent by the consumer</li> 057 * <li>refreshNPersist : a flag indicating that the consumer is processing in Refresh and persist mode</li> 058 * <li></li> 059 * </ul> 060 * A separate log is maintained for each syncrepl consumer.<br> 061 * We also associate a Queue with each structure, which will store the messages to send to the consumer. 062 * 063 * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a> 064 */ 065public class ReplicaEventLog implements Comparable<ReplicaEventLog> 066{ 067 /** The logger */ 068 private static final Logger LOG = LoggerFactory.getLogger( ReplicaEventLog.class ); 069 070 /** A logger for the replication provider */ 071 private static final Logger PROVIDER_LOG = LoggerFactory.getLogger( Loggers.PROVIDER_LOG.getName() ); 072 073 /** hostname of the syncrepl consumer */ 074 private String hostName; 075 076 /** the unmodified search filter as it was when received from the client */ 077 private String searchFilter; 078 079 /** the csn that was sent to the client during the last sync session*/ 080 private String lastSentCsn; 081 082 /** the persistent listener */ 083 private SyncReplSearchListener persistentListener; 084 085 /** notification criteria used by the persistent search */ 086 private NotificationCriteria searchCriteria; 087 088 /** the replica id */ 089 private int replicaId; 090 091 /** flag indicating refreshAndPersist mode */ 092 private boolean refreshNPersist; 093 094 /** the duration(in seconds) of consumer inactivity after which this log will be deleted. Defaults to 172800 seconds (i.e. 2 days) */ 095 private long maxIdlePeriod = DEFAULT_MAX_IDLE_PERIOD; 096 097 /** the minimum number of entries to be present for beginning purging entries older than the last sent CSN. Default is 10000 */ 098 private int purgeThresholdCount = DEFAULT_PURGE_THRESHOLD_COUNT; 099 100 // fields that won't be serialized 101 /** The Journal of modifications */ 102 private JdbmTable<String, ReplicaEventMessage> journal; 103 104 /** the underlying file */ 105 private File journalFile; 106 107 /** The record manager*/ 108 private RecordManager recman; 109 110 /** A flag used to indicate that the consumer is not up to date */ 111 private volatile boolean dirty; 112 113 /** the DN of the entry where this event log details are stored */ 114 private Dn consumerEntryDn; 115 116 public static final String REPLICA_EVENT_LOG_NAME_PREFIX = "REPL_EVENT_LOG."; 117 118 public static final int DEFAULT_PURGE_THRESHOLD_COUNT = 10000; 119 120 /** The max delay for an idle replication log with no activity, by default the logs have no idle time period */ 121 public static final int DEFAULT_MAX_IDLE_PERIOD = -1; 122 123 /** The partition transaction */ 124 private PartitionTxn partitionTxn; 125 126 127 /** 128 * Creates a new instance of EventLog for a replica 129 * 130 * @param partitionTxn The Transaction to use 131 * @param directoryService The DirectoryService instance 132 * @param replicaId The replica ID 133 * @throws IOException if we weren't able to log the event 134 */ 135 public ReplicaEventLog( PartitionTxn partitionTxn, DirectoryService directoryService, int replicaId ) throws IOException 136 { 137 PROVIDER_LOG.debug( "Creating the replication queue for replica {}", replicaId ); 138 SchemaManager schemaManager = directoryService.getSchemaManager(); 139 this.replicaId = replicaId; 140 this.searchCriteria = new NotificationCriteria( schemaManager ); 141 this.searchCriteria.setEventMask( EventType.ALL_EVENT_TYPES_MASK ); 142 143 // Create the journal file, or open if already exists 144 File replDir = directoryService.getInstanceLayout().getReplDirectory(); 145 journalFile = new File( replDir, REPLICA_EVENT_LOG_NAME_PREFIX + replicaId ); 146 recman = new BaseRecordManager( journalFile.getAbsolutePath() ); 147 TransactionManager transactionManager = ( ( BaseRecordManager ) recman ).getTransactionManager(); 148 transactionManager.setMaximumTransactionsInLog( 200 ); 149 150 SerializableComparator<String> comparator = new SerializableComparator<>( 151 SchemaConstants.CSN_ORDERING_MATCH_MR_OID ); 152 comparator.setSchemaManager( schemaManager ); 153 154 journal = new JdbmTable<>( schemaManager, journalFile.getName(), recman, comparator, 155 StringSerializer.INSTANCE, new ReplicaEventMessageSerializer( schemaManager ) ); 156 157 this.partitionTxn = partitionTxn; 158 } 159 160 161 /** 162 * Stores the given message in the queue 163 * 164 * @param message The message to store 165 */ 166 public synchronized void log( ReplicaEventMessage message ) 167 { 168 try 169 { 170 LOG.debug( "logging entry with Dn {} with the event {}", message.getEntry().getDn(), 171 message.getChangeType() ); 172 PROVIDER_LOG.debug( "logging entry with Dn {} with the event {}", message.getEntry().getDn(), 173 message.getChangeType() ); 174 175 String entryCsn = message.getEntry().get( SchemaConstants.ENTRY_CSN_AT ).getString(); 176 journal.put( partitionTxn, entryCsn, message ); 177 } 178 catch ( Exception e ) 179 { 180 LOG.warn( "Failed to insert the entry into syncrepl log", e ); 181 PROVIDER_LOG.error( "Failed to insert the entry into syncrepl log", e ); 182 } 183 } 184 185 186 /** 187 * Deletes the queue (to remove the log) and recreates a new queue instance 188 * with the same queue name. Also creates the corresponding message producer 189 * 190 * @throws Exception If the queue can't be deleted 191 */ 192 public void truncate() throws Exception 193 { 194 } 195 196 197 /** 198 * Re-create the queue 199 * @throws Exception If the creation has failed 200 */ 201 public void recreate() throws Exception 202 { 203 LOG.debug( "recreating the queue for the replica id {}", replicaId ); 204 } 205 206 207 /** 208 * Stop the EventLog 209 * 210 * @throws Exception If the stop failed 211 */ 212 public void stop() throws Exception 213 { 214 PROVIDER_LOG.debug( "Stopping the EventLog for replicaId {}", replicaId ); 215 216 // Close the producer and session, DO NOT close connection 217 if ( journal != null ) 218 { 219 journal.close( partitionTxn ); 220 } 221 222 journal = null; 223 224 if ( recman != null ) 225 { 226 recman.close(); 227 } 228 229 recman = null; 230 } 231 232 233 /** 234 * {@inheritDoc} 235 */ 236 @Override 237 public boolean equals( Object obj ) 238 { 239 if ( !( obj instanceof ReplicaEventLog ) ) 240 { 241 return false; 242 } 243 244 ReplicaEventLog other = ( ReplicaEventLog ) obj; 245 246 return replicaId == other.getId(); 247 } 248 249 250 /** 251 * {@inheritDoc} 252 */ 253 @Override 254 public int hashCode() 255 { 256 int result = 17; 257 result = 31 * result + searchFilter.hashCode(); 258 result = 31 * result + hostName.hashCode(); 259 260 return result; 261 } 262 263 264 /** 265 * {@inheritDoc} 266 */ 267 public int compareTo( ReplicaEventLog o ) 268 { 269 if ( this.equals( o ) ) 270 { 271 return 0; 272 } 273 274 return 1; 275 } 276 277 278 /** 279 * @return The listener 280 */ 281 public SyncReplSearchListener getPersistentListener() 282 { 283 return persistentListener; 284 } 285 286 287 /** 288 * Set the listener 289 * @param persistentListener The listener 290 */ 291 public void setPersistentListener( SyncReplSearchListener persistentListener ) 292 { 293 this.persistentListener = persistentListener; 294 } 295 296 297 /** 298 * @return The search criteria 299 */ 300 public NotificationCriteria getSearchCriteria() 301 { 302 return searchCriteria; 303 } 304 305 306 /** 307 * Stores the search criteria 308 * @param searchCriteria The search criteria 309 */ 310 public void setSearchCriteria( NotificationCriteria searchCriteria ) 311 { 312 this.searchCriteria = searchCriteria; 313 } 314 315 316 /** 317 * @return true if the consumer is in Refresh And Persist mode 318 */ 319 public boolean isRefreshNPersist() 320 { 321 return refreshNPersist; 322 } 323 324 325 /** 326 * @param refreshNPersist if true, set the EventLog in Refresh and Persist mode 327 */ 328 public void setRefreshNPersist( boolean refreshNPersist ) 329 { 330 this.refreshNPersist = refreshNPersist; 331 } 332 333 334 /** 335 * @return The replica ID 336 */ 337 public int getId() 338 { 339 return replicaId; 340 } 341 342 343 /** 344 * @return The last CSN sent by the consumer 345 */ 346 public String getLastSentCsn() 347 { 348 return lastSentCsn; 349 } 350 351 352 /** 353 * Update the last Sent CSN. If it's different from the present one, we 354 * will set the dirty flag to true, and it will be stored in DIT. 355 * 356 * @param lastSentCsn The new Sent CSN 357 */ 358 public void setLastSentCsn( String lastSentCsn ) 359 { 360 // set only if there is a change in cookie value 361 // this will avoid setting the dirty flag which eventually is used for 362 // storing the details of this log 363 if ( !lastSentCsn.equals( this.lastSentCsn ) ) 364 { 365 this.lastSentCsn = lastSentCsn; 366 dirty = true; 367 } 368 } 369 370 371 /** 372 * @return The consumer Hostname 373 */ 374 public String getHostName() 375 { 376 return hostName; 377 } 378 379 380 /** 381 * Set the consumer hostname 382 * @param hostName The consumer hostname 383 */ 384 public void setHostName( String hostName ) 385 { 386 this.hostName = hostName; 387 } 388 389 390 /** 391 * @return The searchFilter 392 */ 393 public String getSearchFilter() 394 { 395 return searchFilter; 396 } 397 398 399 /** 400 * Set the searchFilter 401 * @param searchFilter The searchFilter 402 */ 403 public void setSearchFilter( String searchFilter ) 404 { 405 this.searchFilter = searchFilter; 406 } 407 408 409 /** 410 * @return True if the consumer is not up to date 411 */ 412 public boolean isDirty() 413 { 414 return dirty; 415 } 416 417 418 /** 419 * Set the dirty flag 420 * @param dirty The current consumer status 421 */ 422 public void setDirty( boolean dirty ) 423 { 424 this.dirty = dirty; 425 } 426 427 428 /** 429 * @return The queue name 430 */ 431 public String getQueueName() 432 { 433 return "replicaId=" + replicaId; 434 } 435 436 437 /** 438 * @param consumerCsn the consumer's CSN extracted from cookie 439 * @return A cursor on top of the queue 440 * @throws Exception If the cursor can't be created 441 */ 442 public ReplicaJournalCursor getCursor( String consumerCsn ) throws Exception 443 { 444 return new ReplicaJournalCursor( partitionTxn, journal, consumerCsn ); 445 } 446 447 448 /** 449 * @return the name of this replica log 450 */ 451 public String getName() 452 { 453 return journal.getName(); 454 } 455 456 457 /** 458 * @return the number of entries present in the replica log 459 */ 460 public synchronized long count() 461 { 462 try 463 { 464 return journal.count( partitionTxn ); 465 } 466 catch ( LdapException e ) 467 { 468 throw new RuntimeException( e ); 469 } 470 } 471 472 473 public long getMaxIdlePeriod() 474 { 475 return maxIdlePeriod; 476 } 477 478 479 public void setMaxIdlePeriod( long maxIdlePeriod ) 480 { 481 if ( maxIdlePeriod <= 0 ) 482 { 483 maxIdlePeriod = DEFAULT_MAX_IDLE_PERIOD; 484 } 485 486 this.maxIdlePeriod = maxIdlePeriod; 487 } 488 489 490 public int getPurgeThresholdCount() 491 { 492 return purgeThresholdCount; 493 } 494 495 496 public void setPurgeThresholdCount( int purgeThresholdCount ) 497 { 498 if ( purgeThresholdCount <= 0 ) 499 { 500 purgeThresholdCount = DEFAULT_PURGE_THRESHOLD_COUNT; 501 } 502 503 this.purgeThresholdCount = purgeThresholdCount; 504 } 505 506 507 public Dn getConsumerEntryDn() 508 { 509 return consumerEntryDn; 510 } 511 512 513 public void setConsumerEntryDn( Dn consumerEntryDn ) 514 { 515 this.consumerEntryDn = consumerEntryDn; 516 } 517 518 519 @Override 520 public String toString() 521 { 522 return "ReplicaEventLog [hostName=" + hostName + ", searchFilter=" + searchFilter + ", lastSentCsn=" 523 + lastSentCsn + ", searchCriteria=" + searchCriteria + ", replicaId=" + replicaId 524 + ", refreshNPersist=" + refreshNPersist + ", maxInactivePeriod=" + maxIdlePeriod 525 + ", purgeThresholdCount=" + purgeThresholdCount + ", journalFile=" + journalFile 526 + ", dirty=" + dirty + ", consumerEntryDn=" + consumerEntryDn + "]"; 527 } 528}