001/* 002 * Licensed to the Apache Software Foundation (ASF) under one 003 * or more contributor license agreements. See the NOTICE file 004 * distributed with this work for additional information 005 * regarding copyright ownership. The ASF licenses this file 006 * to you under the Apache License, Version 2.0 (the 007 * "License"); you may not use this file except in compliance 008 * with the License. You may obtain a copy of the License at 009 * 010 * http://www.apache.org/licenses/LICENSE-2.0 011 * 012 * Unless required by applicable law or agreed to in writing, 013 * software distributed under the License is distributed on an 014 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 015 * KIND, either express or implied. See the License for the 016 * specific language governing permissions and limitations 017 * under the License. 018 * 019 */ 020 021package org.apache.directory.ldap.client.api.future; 022 023 024import java.util.concurrent.BlockingQueue; 025import java.util.concurrent.Future; 026import java.util.concurrent.LinkedBlockingQueue; 027import java.util.concurrent.TimeUnit; 028 029import org.apache.directory.api.ldap.model.message.Response; 030import org.apache.directory.ldap.client.api.LdapConnection; 031 032 033/** 034 * A Future implementation used in LdapConnection operations. 035 * 036 * @param <R> The result type returned by this Future's <tt>get</tt> method 037 * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a> 038 */ 039public class ResponseFuture<R extends Response> implements Future<Response> 040{ 041 /** the blocking queue holding LDAP responses */ 042 protected BlockingQueue<R> queue; 043 044 /** flag to determine if this future is cancelled */ 045 protected boolean cancelled = false; 046 047 /** If the request has been cancelled because of an exception it will be stored here */ 048 protected Throwable cause; 049 050 /** The messageID for this future */ 051 protected int messageId; 052 053 /** The connection used by the request */ 054 protected LdapConnection connection; 055 056 057 /** 058 * Creates a new instance of ResponseFuture. 059 * 060 * @param connection The LdapConnection used by the request 061 * @param messageId The associated message ID 062 */ 063 public ResponseFuture( LdapConnection connection, int messageId ) 064 { 065 queue = new LinkedBlockingQueue<>(); 066 this.messageId = messageId; 067 this.connection = connection; 068 } 069 070 071 /** 072 * {@inheritDoc} 073 */ 074 @Override 075 public boolean cancel( boolean mayInterruptIfRunning ) 076 { 077 if ( cancelled ) 078 { 079 return cancelled; 080 } 081 082 // set the cancel flag first 083 cancelled = true; 084 085 // Send an abandonRequest only if this future exists 086 if ( !connection.isRequestCompleted( messageId ) ) 087 { 088 connection.abandon( messageId ); 089 } 090 091 // then clear the queue, cause the might be some incoming messages before this abandon request 092 // hits the server 093 queue.clear(); 094 095 return cancelled; 096 } 097 098 099 /** 100 * {@inheritDoc} 101 * @throws InterruptedException if the operation has been cancelled by client 102 */ 103 @Override 104 public R get() throws InterruptedException 105 { 106 return queue.take(); 107 } 108 109 110 /** 111 * Set the associated Response in this Future 112 * 113 * @param response The response to add into the Future 114 * @throws InterruptedException if the operation has been cancelled by client 115 */ 116 public void set( R response ) throws InterruptedException 117 { 118 queue.add( response ); 119 } 120 121 122 /** 123 * {@inheritDoc} 124 * @throws InterruptedException if the operation has been cancelled by client 125 */ 126 @Override 127 public R get( long timeout, TimeUnit unit ) throws InterruptedException 128 { 129 return queue.poll( timeout, unit ); 130 } 131 132 133 /** 134 * {@inheritDoc} 135 */ 136 @Override 137 public boolean isCancelled() 138 { 139 return cancelled; 140 } 141 142 143 /** 144 * This operation is not supported in this implementation of Future. 145 * 146 * {@inheritDoc} 147 */ 148 @Override 149 public boolean isDone() 150 { 151 throw new UnsupportedOperationException( "Operation not supported" ); 152 } 153 154 155 /** 156 * @return the cause 157 */ 158 public Throwable getCause() 159 { 160 return cause; 161 } 162 163 164 /** 165 * Associate a cause to the ResponseFuture 166 * @param cause the cause to set 167 */ 168 public void setCause( Throwable cause ) 169 { 170 this.cause = cause; 171 } 172 173 174 /** 175 * Cancel the Future 176 * 177 */ 178 public void cancel() 179 { 180 // set the cancel flag first 181 cancelled = true; 182 } 183 184 185 /** 186 * {@inheritDoc} 187 */ 188 @Override 189 public String toString() 190 { 191 StringBuilder sb = new StringBuilder(); 192 193 sb.append( "[msgId : " ).append( messageId ).append( ", " ); 194 sb.append( "size : " ).append( queue.size() ).append( ", " ); 195 sb.append( "Canceled :" ).append( cancelled ).append( "]" ); 196 197 return sb.toString(); 198 } 199}