View Javadoc

1   /**
2    * 
3    */
4   package org.treetank.io.jclouds;
5   
6   import static com.google.common.base.Preconditions.checkNotNull;
7   
8   import java.io.ByteArrayInputStream;
9   import java.io.DataInputStream;
10  import java.io.IOException;
11  import java.net.SocketTimeoutException;
12  import java.nio.ByteBuffer;
13  import java.util.Map;
14  import java.util.concurrent.Callable;
15  import java.util.concurrent.ExecutionException;
16  
17  import org.jclouds.blobstore.BlobStore;
18  import org.jclouds.blobstore.domain.Blob;
19  import org.treetank.bucket.BucketFactory;
20  import org.treetank.bucket.UberBucket;
21  import org.treetank.bucket.interfaces.IBucket;
22  import org.treetank.exception.TTException;
23  import org.treetank.exception.TTIOException;
24  import org.treetank.io.IBackendReader;
25  import org.treetank.io.bytepipe.IByteHandler.IByteHandlerPipeline;
26  
27  import com.google.common.cache.Cache;
28  import com.google.common.cache.CacheBuilder;
29  import com.google.common.io.ByteStreams;
30  
31  /**
32   * Accessing the Cloud storage for reading in a multithreaded manner.
33   * 
34   * @author Sebastian Graf, University of Konstanz
35   * 
36   */
37  public class JCloudsReader implements IBackendReader {
38  
39      // private final static File readFile = new File("/Users/sebi/Desktop/runtimeResults/readaccess.txt");
40      // private final static File downloadFile =
41      // new File("/Users/sebi/Desktop/runtimeResults/downloadaccess.txt");
42      //
43      // static final FileWriter reader;
44      // static final FileWriter download;
45      //
46      // static {
47      // try {
48      // reader = new FileWriter(readFile);
49      // download = new FileWriter(downloadFile);
50      // } catch (IOException e) {
51      // throw new RuntimeException(e);
52      // }
53      // }
54  
55      private final static long POISONNUMBER = -15;
56  
57      // private final static int BUCKETS_TO_PREFETCH = 3;
58  
59      /** Blob Store for Reading the data. */
60      protected final BlobStore mBlobStore;
61  
62      /** Factory for building Buckets. */
63      private final BucketFactory mFac;
64  
65      /** Inflater to decompress. */
66      protected final IByteHandlerPipeline mByteHandler;
67  
68      /** Resource name of this container and the database. */
69      protected final String mResourceName;
70  
71      /** Cache for reading data. */
72      protected final Cache<Long, IBucket> mCache;
73  
74      //
75      // /** Executing read requests. */
76      // private final ExecutorService mReaderService;
77      //
78      // /** CompletionService for getting aware of concluded tasks. */
79      // private final CompletionService<Map.Entry<Long, IBucket>> mReaderCompletion;
80      //
81      // /** Blocking already performing tasks. */
82      // private final ConcurrentHashMap<Long, Future<Map.Entry<Long, IBucket>>> mTasks;
83  
84      public JCloudsReader(BlobStore pBlobStore, BucketFactory pFac, IByteHandlerPipeline pByteHandler,
85          String pResourceName) throws TTException {
86          mBlobStore = pBlobStore;
87          mByteHandler = pByteHandler;
88          mFac = pFac;
89          mResourceName = pResourceName;
90          mCache = CacheBuilder.newBuilder().maximumSize(100).build();
91  
92          // mTasks = new ConcurrentHashMap<Long, Future<Map.Entry<Long, IBucket>>>();
93          // mReaderService = Executors.newFixedThreadPool(20);
94          //
95          // mReaderCompletion = new ExecutorCompletionService<Map.Entry<Long, IBucket>>(mReaderService);
96          // final FutureCleaner cleaner = new FutureCleaner();
97          // final ExecutorService cleanerService = Executors.newSingleThreadExecutor();
98          // cleanerService.submit(cleaner);
99          // cleanerService.shutdown();
100     }
101 
102     /**
103      * {@inheritDoc}
104      */
105     @Override
106     public UberBucket readUber() throws TTIOException {
107         try {
108             final Blob blobRetrieved = mBlobStore.getBlob(mResourceName, Long.toString(-1l));
109             final DataInputStream datain = new DataInputStream(blobRetrieved.getPayload().getInput());
110             final long uberkey = datain.readLong();
111             final UberBucket bucket = (UberBucket)read(uberkey);
112             datain.close();
113             return bucket;
114         } catch (final IOException exc) {
115             throw new TTIOException(exc);
116         }
117     }
118 
119     /**
120      * {@inheritDoc}
121      */
122     @Override
123     public IBucket read(long pKey) throws TTIOException {
124         // IBucket returnval = null;
125         IBucket returnval = mCache.getIfPresent(pKey);
126         if (returnval == null) {
127             try {
128                 returnval = getAndprefetchBuckets(pKey);
129                 // reader.write(returnval.getBucketKey() + "," + returnval.getClass().getName() + "\n");
130                 // reader.flush();
131             } catch (Exception exc) {
132                 throw new TTIOException(exc);
133             }
134         }
135         return returnval;
136 
137     }
138 
139     private final IBucket getAndprefetchBuckets(final long pId) throws InterruptedException,
140         ExecutionException {
141         IBucket returnVal = null;
142         // // Future<Map.Entry<Long, IBucket>> startTask = null;
143         // for (long i = pId; i < pId + BUCKETS_TO_PREFETCH; i++) {
144         // Future<Map.Entry<Long, IBucket>> currentTask = mTasks.remove(i);
145         // if (currentTask == null) {
146         // currentTask = mReaderCompletion.submit(new ReadTask(i));
147         // mTasks.put(i, currentTask);
148         // }
149         // if (i == pId) {
150         // startTask = currentTask;
151         // }
152         // }
153         // returnVal = startTask.get().getValue();
154         try {
155             returnVal = new ReadTask(pId).call().getValue();
156             mCache.put(pId, returnVal);
157         } catch (Exception e) {
158             e.printStackTrace();
159         }
160         return returnVal;
161     }
162 
163     /**
164      * {@inheritDoc}
165      */
166     @Override
167     public void close() throws TTIOException {
168         // mReaderCompletion.submit(new PoisonTask());
169         mCache.invalidateAll();
170         // mReaderService.shutdown();
171         // try {
172         // mReaderService.awaitTermination(100, TimeUnit.SECONDS);
173         // } catch (final InterruptedException exc) {
174         // throw new TTIOException(exc);
175         // }
176         // checkState(mReaderService.isTerminated());
177     }
178 
179     // /**
180     // * Cleaning up the Running-Tasks Hashmap in the background.
181     // *
182     // * @author Sebastian Graf, University of Konstanz
183     // *
184     // */
185     // class FutureCleaner implements Callable<Long> {
186     //
187     // public Long call() throws Exception {
188     // boolean run = true;
189     // while (run) {
190     // Future<Map.Entry<Long, IBucket>> element = mReaderCompletion.take();
191     // long number = element.get().getKey();
192     // if (number == POISONNUMBER) {
193     // run = false;
194     // } else {
195     // mTasks.remove(element.get().getKey());
196     // }
197     // }
198     // return POISONNUMBER;
199     // }
200     // }
201 
202     /**
203      * Single task to write data to the cloud.
204      * 
205      * @author Sebastian Graf, University of Konstanz
206      * 
207      */
208     class ReadTask implements Callable<Map.Entry<Long, IBucket>> {
209 
210         /**
211          * Bucket ID to be read.
212          */
213         final long mBucketId;
214 
215         ReadTask(final long pBucketId) {
216             this.mBucketId = pBucketId;
217         }
218 
219         @Override
220         public Map.Entry<Long, IBucket> call() throws Exception {
221 
222             final int tryCounter = 100;
223             IBucket bucket = null;
224             Blob blob = mBlobStore.getBlob(mResourceName, Long.toString(mBucketId));
225             int i = 1;
226             while (blob == null && i <= tryCounter) {
227                 Thread.sleep(i * 10);
228                 blob = mBlobStore.getBlob(mResourceName, Long.toString(mBucketId));
229                 i++;
230             }
231             checkNotNull(blob, "Blob %s not found", mBucketId);
232 
233             // retrieving incomplete written data completely
234             boolean stayIn = false;
235             byte[] data = new byte[0];
236             do {
237                 try {
238                     data = ByteStreams.toByteArray(blob.getPayload().getInput());
239                     final ByteBuffer buffer = ByteBuffer.wrap(data);
240                     final int length = buffer.getInt();
241                     if (length < data.length) {
242                         stayIn = true;
243                     }
244                 } catch (SocketTimeoutException exc) {
245                     stayIn = true;
246                 }
247             } while (stayIn);
248 
249             final byte[] dataWithoutSize = new byte[data.length - 4];
250             System.arraycopy(data, 4, dataWithoutSize, 0, dataWithoutSize.length);
251 
252             DataInputStream datain =
253                 new DataInputStream(mByteHandler.deserialize(new ByteArrayInputStream(dataWithoutSize)));
254             bucket = mFac.deserializeBucket(datain);
255             datain.close();
256             // mCache.put(mBucketId, bucket);
257             // }
258 
259             // download.write(bucket.getBucketKey() + "," + bucket.getClass().getName() + "\n");
260             // download.flush();
261 
262             final IBucket returnVal = bucket;
263 
264             return new Map.Entry<Long, IBucket>() {
265                 @Override
266                 public Long getKey() {
267                     return mBucketId;
268                 }
269 
270                 @Override
271                 public IBucket getValue() {
272                     return returnVal;
273                 }
274 
275                 @Override
276                 public IBucket setValue(IBucket value) {
277                     throw new UnsupportedOperationException();
278                 }
279             };
280         }
281     }
282 
283     /**
284      * Tasks for ending the cleaner .
285      * 
286      * @author Sebastian Graf, University of Konstanz
287      * 
288      */
289     class PoisonTask implements Callable<Map.Entry<Long, IBucket>> {
290 
291         /**
292          * {@inheritDoc}
293          */
294         @Override
295         public Map.Entry<Long, IBucket> call() throws Exception {
296             return new Map.Entry<Long, IBucket>() {
297 
298                 @Override
299                 public Long getKey() {
300                     return POISONNUMBER;
301                 }
302 
303                 @Override
304                 public IBucket getValue() {
305                     return null;
306                 }
307 
308                 @Override
309                 public IBucket setValue(IBucket value) {
310                     throw new UnsupportedOperationException();
311                 }
312             };
313         }
314     }
315 
316 }