1 /**
2 *
3 */
4 package org.treetank.io.jclouds;
5
6 import static com.google.common.base.Preconditions.checkNotNull;
7
8 import java.io.ByteArrayInputStream;
9 import java.io.DataInputStream;
10 import java.io.IOException;
11 import java.net.SocketTimeoutException;
12 import java.nio.ByteBuffer;
13 import java.util.Map;
14 import java.util.concurrent.Callable;
15 import java.util.concurrent.ExecutionException;
16
17 import org.jclouds.blobstore.BlobStore;
18 import org.jclouds.blobstore.domain.Blob;
19 import org.treetank.bucket.BucketFactory;
20 import org.treetank.bucket.UberBucket;
21 import org.treetank.bucket.interfaces.IBucket;
22 import org.treetank.exception.TTException;
23 import org.treetank.exception.TTIOException;
24 import org.treetank.io.IBackendReader;
25 import org.treetank.io.bytepipe.IByteHandler.IByteHandlerPipeline;
26
27 import com.google.common.cache.Cache;
28 import com.google.common.cache.CacheBuilder;
29 import com.google.common.io.ByteStreams;
30
31 /**
32 * Accessing the Cloud storage for reading in a multithreaded manner.
33 *
34 * @author Sebastian Graf, University of Konstanz
35 *
36 */
37 public class JCloudsReader implements IBackendReader {
38
39 // private final static File readFile = new File("/Users/sebi/Desktop/runtimeResults/readaccess.txt");
40 // private final static File downloadFile =
41 // new File("/Users/sebi/Desktop/runtimeResults/downloadaccess.txt");
42 //
43 // static final FileWriter reader;
44 // static final FileWriter download;
45 //
46 // static {
47 // try {
48 // reader = new FileWriter(readFile);
49 // download = new FileWriter(downloadFile);
50 // } catch (IOException e) {
51 // throw new RuntimeException(e);
52 // }
53 // }
54
55 private final static long POISONNUMBER = -15;
56
57 // private final static int BUCKETS_TO_PREFETCH = 3;
58
59 /** Blob Store for Reading the data. */
60 protected final BlobStore mBlobStore;
61
62 /** Factory for building Buckets. */
63 private final BucketFactory mFac;
64
65 /** Inflater to decompress. */
66 protected final IByteHandlerPipeline mByteHandler;
67
68 /** Resource name of this container and the database. */
69 protected final String mResourceName;
70
71 /** Cache for reading data. */
72 protected final Cache<Long, IBucket> mCache;
73
74 //
75 // /** Executing read requests. */
76 // private final ExecutorService mReaderService;
77 //
78 // /** CompletionService for getting aware of concluded tasks. */
79 // private final CompletionService<Map.Entry<Long, IBucket>> mReaderCompletion;
80 //
81 // /** Blocking already performing tasks. */
82 // private final ConcurrentHashMap<Long, Future<Map.Entry<Long, IBucket>>> mTasks;
83
84 public JCloudsReader(BlobStore pBlobStore, BucketFactory pFac, IByteHandlerPipeline pByteHandler,
85 String pResourceName) throws TTException {
86 mBlobStore = pBlobStore;
87 mByteHandler = pByteHandler;
88 mFac = pFac;
89 mResourceName = pResourceName;
90 mCache = CacheBuilder.newBuilder().maximumSize(100).build();
91
92 // mTasks = new ConcurrentHashMap<Long, Future<Map.Entry<Long, IBucket>>>();
93 // mReaderService = Executors.newFixedThreadPool(20);
94 //
95 // mReaderCompletion = new ExecutorCompletionService<Map.Entry<Long, IBucket>>(mReaderService);
96 // final FutureCleaner cleaner = new FutureCleaner();
97 // final ExecutorService cleanerService = Executors.newSingleThreadExecutor();
98 // cleanerService.submit(cleaner);
99 // cleanerService.shutdown();
100 }
101
102 /**
103 * {@inheritDoc}
104 */
105 @Override
106 public UberBucket readUber() throws TTIOException {
107 try {
108 final Blob blobRetrieved = mBlobStore.getBlob(mResourceName, Long.toString(-1l));
109 final DataInputStream datain = new DataInputStream(blobRetrieved.getPayload().getInput());
110 final long uberkey = datain.readLong();
111 final UberBucket bucket = (UberBucket)read(uberkey);
112 datain.close();
113 return bucket;
114 } catch (final IOException exc) {
115 throw new TTIOException(exc);
116 }
117 }
118
119 /**
120 * {@inheritDoc}
121 */
122 @Override
123 public IBucket read(long pKey) throws TTIOException {
124 // IBucket returnval = null;
125 IBucket returnval = mCache.getIfPresent(pKey);
126 if (returnval == null) {
127 try {
128 returnval = getAndprefetchBuckets(pKey);
129 // reader.write(returnval.getBucketKey() + "," + returnval.getClass().getName() + "\n");
130 // reader.flush();
131 } catch (Exception exc) {
132 throw new TTIOException(exc);
133 }
134 }
135 return returnval;
136
137 }
138
139 private final IBucket getAndprefetchBuckets(final long pId) throws InterruptedException,
140 ExecutionException {
141 IBucket returnVal = null;
142 // // Future<Map.Entry<Long, IBucket>> startTask = null;
143 // for (long i = pId; i < pId + BUCKETS_TO_PREFETCH; i++) {
144 // Future<Map.Entry<Long, IBucket>> currentTask = mTasks.remove(i);
145 // if (currentTask == null) {
146 // currentTask = mReaderCompletion.submit(new ReadTask(i));
147 // mTasks.put(i, currentTask);
148 // }
149 // if (i == pId) {
150 // startTask = currentTask;
151 // }
152 // }
153 // returnVal = startTask.get().getValue();
154 try {
155 returnVal = new ReadTask(pId).call().getValue();
156 mCache.put(pId, returnVal);
157 } catch (Exception e) {
158 e.printStackTrace();
159 }
160 return returnVal;
161 }
162
163 /**
164 * {@inheritDoc}
165 */
166 @Override
167 public void close() throws TTIOException {
168 // mReaderCompletion.submit(new PoisonTask());
169 mCache.invalidateAll();
170 // mReaderService.shutdown();
171 // try {
172 // mReaderService.awaitTermination(100, TimeUnit.SECONDS);
173 // } catch (final InterruptedException exc) {
174 // throw new TTIOException(exc);
175 // }
176 // checkState(mReaderService.isTerminated());
177 }
178
179 // /**
180 // * Cleaning up the Running-Tasks Hashmap in the background.
181 // *
182 // * @author Sebastian Graf, University of Konstanz
183 // *
184 // */
185 // class FutureCleaner implements Callable<Long> {
186 //
187 // public Long call() throws Exception {
188 // boolean run = true;
189 // while (run) {
190 // Future<Map.Entry<Long, IBucket>> element = mReaderCompletion.take();
191 // long number = element.get().getKey();
192 // if (number == POISONNUMBER) {
193 // run = false;
194 // } else {
195 // mTasks.remove(element.get().getKey());
196 // }
197 // }
198 // return POISONNUMBER;
199 // }
200 // }
201
202 /**
203 * Single task to write data to the cloud.
204 *
205 * @author Sebastian Graf, University of Konstanz
206 *
207 */
208 class ReadTask implements Callable<Map.Entry<Long, IBucket>> {
209
210 /**
211 * Bucket ID to be read.
212 */
213 final long mBucketId;
214
215 ReadTask(final long pBucketId) {
216 this.mBucketId = pBucketId;
217 }
218
219 @Override
220 public Map.Entry<Long, IBucket> call() throws Exception {
221
222 final int tryCounter = 100;
223 IBucket bucket = null;
224 Blob blob = mBlobStore.getBlob(mResourceName, Long.toString(mBucketId));
225 int i = 1;
226 while (blob == null && i <= tryCounter) {
227 Thread.sleep(i * 10);
228 blob = mBlobStore.getBlob(mResourceName, Long.toString(mBucketId));
229 i++;
230 }
231 checkNotNull(blob, "Blob %s not found", mBucketId);
232
233 // retrieving incomplete written data completely
234 boolean stayIn = false;
235 byte[] data = new byte[0];
236 do {
237 try {
238 data = ByteStreams.toByteArray(blob.getPayload().getInput());
239 final ByteBuffer buffer = ByteBuffer.wrap(data);
240 final int length = buffer.getInt();
241 if (length < data.length) {
242 stayIn = true;
243 }
244 } catch (SocketTimeoutException exc) {
245 stayIn = true;
246 }
247 } while (stayIn);
248
249 final byte[] dataWithoutSize = new byte[data.length - 4];
250 System.arraycopy(data, 4, dataWithoutSize, 0, dataWithoutSize.length);
251
252 DataInputStream datain =
253 new DataInputStream(mByteHandler.deserialize(new ByteArrayInputStream(dataWithoutSize)));
254 bucket = mFac.deserializeBucket(datain);
255 datain.close();
256 // mCache.put(mBucketId, bucket);
257 // }
258
259 // download.write(bucket.getBucketKey() + "," + bucket.getClass().getName() + "\n");
260 // download.flush();
261
262 final IBucket returnVal = bucket;
263
264 return new Map.Entry<Long, IBucket>() {
265 @Override
266 public Long getKey() {
267 return mBucketId;
268 }
269
270 @Override
271 public IBucket getValue() {
272 return returnVal;
273 }
274
275 @Override
276 public IBucket setValue(IBucket value) {
277 throw new UnsupportedOperationException();
278 }
279 };
280 }
281 }
282
283 /**
284 * Tasks for ending the cleaner .
285 *
286 * @author Sebastian Graf, University of Konstanz
287 *
288 */
289 class PoisonTask implements Callable<Map.Entry<Long, IBucket>> {
290
291 /**
292 * {@inheritDoc}
293 */
294 @Override
295 public Map.Entry<Long, IBucket> call() throws Exception {
296 return new Map.Entry<Long, IBucket>() {
297
298 @Override
299 public Long getKey() {
300 return POISONNUMBER;
301 }
302
303 @Override
304 public IBucket getValue() {
305 return null;
306 }
307
308 @Override
309 public IBucket setValue(IBucket value) {
310 throw new UnsupportedOperationException();
311 }
312 };
313 }
314 }
315
316 }