001/*
002 *   Licensed to the Apache Software Foundation (ASF) under one
003 *   or more contributor license agreements.  See the NOTICE file
004 *   distributed with this work for additional information
005 *   regarding copyright ownership.  The ASF licenses this file
006 *   to you under the Apache License, Version 2.0 (the
007 *   "License"); you may not use this file except in compliance
008 *   with the License.  You may obtain a copy of the License at
009 *
010 *     http://www.apache.org/licenses/LICENSE-2.0
011 *
012 *   Unless required by applicable law or agreed to in writing,
013 *   software distributed under the License is distributed on an
014 *   "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
015 *   KIND, either express or implied.  See the License for the
016 *   specific language governing permissions and limitations
017 *   under the License.
018 *
019 */
020
021package org.apache.directory.ldap.client.api.future;
022
023
024import java.util.concurrent.BlockingQueue;
025import java.util.concurrent.Future;
026import java.util.concurrent.LinkedBlockingQueue;
027import java.util.concurrent.TimeUnit;
028
029import org.apache.directory.api.ldap.model.message.Response;
030import org.apache.directory.ldap.client.api.LdapConnection;
031
032
033/**
034 * A Future implementation used in LdapConnection operations.
035 *
036 * @param <R> The result type returned by this Future's <tt>get</tt> method
037 * @author <a href="mailto:dev@directory.apache.org">Apache Directory Project</a>
038 */
039public class ResponseFuture<R extends Response> implements Future<Response>
040{
041    /** the blocking queue holding LDAP responses */
042    protected BlockingQueue<R> queue;
043
044    /** flag to determine if this future is cancelled */
045    protected boolean cancelled = false;
046
047    /** If the request has been cancelled because of an exception  it will be stored here */
048    protected Throwable cause;
049
050    /** The messageID for this future */
051    protected int messageId;
052
053    /** The connection used by the request */
054    protected LdapConnection connection;
055
056
057    /**
058     * Creates a new instance of ResponseFuture.
059     *
060     * @param connection The LdapConnection used by the request
061     * @param messageId The associated message ID
062     */
063    public ResponseFuture( LdapConnection connection, int messageId )
064    {
065        queue = new LinkedBlockingQueue<>();
066        this.messageId = messageId;
067        this.connection = connection;
068    }
069
070
071    /**
072     * {@inheritDoc}
073     */
074    @Override
075    public boolean cancel( boolean mayInterruptIfRunning )
076    {
077        if ( cancelled )
078        {
079            return cancelled;
080        }
081
082        // set the cancel flag first
083        cancelled = true;
084
085        // Send an abandonRequest only if this future exists
086        if ( !connection.isRequestCompleted( messageId ) )
087        {
088            connection.abandon( messageId );
089        }
090
091        // then clear the queue, cause the might be some incoming messages before this abandon request
092        // hits the server
093        queue.clear();
094
095        return cancelled;
096    }
097
098
099    /**
100     * {@inheritDoc}
101     * @throws InterruptedException if the operation has been cancelled by client
102     */
103    @Override
104    public R get() throws InterruptedException
105    {
106        return queue.take();
107    }
108
109
110    /**
111     * Set the associated Response in this Future
112     * 
113     * @param response The response to add into the Future
114     * @throws InterruptedException if the operation has been cancelled by client
115     */
116    public void set( R response ) throws InterruptedException
117    {
118        queue.add( response );
119    }
120
121
122    /**
123     * {@inheritDoc}
124     * @throws InterruptedException if the operation has been cancelled by client
125     */
126    @Override
127    public R get( long timeout, TimeUnit unit ) throws InterruptedException
128    {
129        return queue.poll( timeout, unit );
130    }
131
132
133    /**
134     * {@inheritDoc}
135     */
136    @Override
137    public boolean isCancelled()
138    {
139        return cancelled;
140    }
141
142
143    /**
144     * This operation is not supported in this implementation of Future.
145     * 
146     * {@inheritDoc}
147     */
148    @Override
149    public boolean isDone()
150    {
151        throw new UnsupportedOperationException( "Operation not supported" );
152    }
153
154
155    /**
156     * @return the cause
157     */
158    public Throwable getCause()
159    {
160        return cause;
161    }
162
163
164    /**
165     * Associate a cause to the ResponseFuture
166     * @param cause the cause to set
167     */
168    public void setCause( Throwable cause )
169    {
170        this.cause = cause;
171    }
172
173
174    /**
175     * Cancel the Future
176     *
177     */
178    public void cancel()
179    {
180        // set the cancel flag first
181        cancelled = true;
182    }
183
184
185    /**
186     * {@inheritDoc}
187     */
188    @Override
189    public String toString()
190    {
191        StringBuilder sb = new StringBuilder();
192
193        sb.append( "[msgId : " ).append( messageId ).append( ", " );
194        sb.append( "size : " ).append( queue.size() ).append( ", " );
195        sb.append( "Canceled :" ).append( cancelled ).append( "]" );
196
197        return sb.toString();
198    }
199}