View Javadoc

1   /**
2    * 
3    */
4   package org.treetank.io.jclouds;
5   
6   import java.io.ByteArrayOutputStream;
7   import java.io.DataOutputStream;
8   import java.io.IOException;
9   import java.nio.ByteBuffer;
10  import java.util.concurrent.Callable;
11  
12  import org.jclouds.blobstore.BlobStore;
13  import org.jclouds.blobstore.domain.Blob;
14  import org.jclouds.blobstore.domain.BlobBuilder;
15  import org.treetank.bucket.BucketFactory;
16  import org.treetank.bucket.UberBucket;
17  import org.treetank.bucket.interfaces.IBucket;
18  import org.treetank.exception.TTByteHandleException;
19  import org.treetank.exception.TTException;
20  import org.treetank.exception.TTIOException;
21  import org.treetank.io.IBackendWriter;
22  import org.treetank.io.bytepipe.IByteHandler.IByteHandlerPipeline;
23  
24  /**
25   * @author Sebastian Graf, University of Konstanz
26   * 
27   */
28  public class JCloudsWriter implements IBackendWriter {
29  
30      // // START DEBUG CODE
31      // private final static File writeFile = new File("/Users/sebi/Desktop/runtimeResults/writeaccess.txt");
32      // private final static File uploadFile = new File("/Users/sebi/Desktop/runtimeResults/uploadaccess.txt");
33      //
34      // static final FileWriter writer;
35      // static final FileWriter upload;
36      //
37      // static {
38      // try {
39      // writer = new FileWriter(writeFile);
40      // upload = new FileWriter(uploadFile);
41      // } catch (IOException e) {
42      // throw new RuntimeException(e);
43      // }
44      // }
45  
46      // private final static long POISONNUMBER = -15;
47  
48      /** Delegate for reader. */
49      private final JCloudsReader mReader;
50  
51      // static long readTime = 0;
52      // static int readCounter = 0;
53      // static long writeTime = 0;
54      // static int writeCounter = 0;
55  
56      // private final ConcurrentHashMap<Long, Future<Long>> mRunningWriteTasks;
57      // private final CompletionService<Long> mWriterCompletion;
58      // /** Executing read requests. */
59      // private final ExecutorService mWriterService;
60  
61      public JCloudsWriter(BlobStore pBlobStore, BucketFactory pFac, IByteHandlerPipeline pByteHandler,
62          String pResourceName) throws TTException {
63          mReader = new JCloudsReader(pBlobStore, pFac, pByteHandler, pResourceName);
64  
65          // mWriterService = Executors.newFixedThreadPool(20);
66          // mRunningWriteTasks = new ConcurrentHashMap<Long, Future<Long>>();
67          // mWriterCompletion = new ExecutorCompletionService<Long>(mWriterService);
68  
69          // final WriteFutureCleaner cleaner = new WriteFutureCleaner();
70          // final ExecutorService cleanerService = Executors.newSingleThreadExecutor();
71          // cleanerService.submit(cleaner);
72          // cleanerService.shutdown();
73  
74      }
75  
76      /**
77       * {@inheritDoc}
78       */
79      @Override
80      public IBucket read(long pKey) throws TTIOException {
81          // Future<Long> task = mRunningWriteTasks.get(pKey);
82          // if (task != null) {
83          // try {
84          // task.get();
85          // } catch (InterruptedException | ExecutionException exc) {
86          // throw new TTIOException(exc);
87          // }
88          // }
89          // readCounter++;
90          // long time = System.currentTimeMillis();
91          final IBucket bucket = mReader.read(pKey);
92          // readTime = readTime + System.currentTimeMillis() - time;
93          return bucket;
94      }
95  
96      /**
97       * {@inheritDoc}
98       */
99      @Override
100     public void write(final IBucket pBucket) throws TTIOException, TTByteHandleException {
101         try {
102             // writer.write(pBucket.getBucketKey() + "," + pBucket.getClass().getName() + "\n");
103             // writer.flush();
104             //
105             // writeCounter++;
106             // long time = System.currentTimeMillis();
107             new WriteTask(pBucket).call();
108             // writeTime = writeTime + System.currentTimeMillis() - time;
109             // Future<Long> task = mWriterCompletion.submit(new WriteTask(pBucket));
110             // mRunningWriteTasks.put(pBucket.getBucketKey(), task);
111             // mReader.mCache.put(pBucket.getBucketKey(), pBucket);
112         } catch (final Exception exc) {
113             throw new TTIOException(exc);
114         }
115     }
116 
117     /**
118      * {@inheritDoc}
119      */
120     @Override
121     public void close() throws TTIOException {
122         // mWriterCompletion.submit(new PoisonTask());
123         // mWriterService.shutdown();
124         // try {
125         // mWriterService.awaitTermination(100, TimeUnit.SECONDS);
126         // } catch (final InterruptedException exc) {
127         // throw new TTIOException(exc);
128         // }
129         // checkState(mWriterService.isTerminated());
130         // System.out.println("Read time: " + readTime);
131         // System.out.println("Write time: " + writeTime);
132         // System.out.println("Read counter: " + readCounter);
133         // System.out.println("Write counter: " + writeCounter);
134         //
135         // readTime = 0;
136         // writeTime = 0;
137         // readCounter = 0;
138         // writeCounter = 0;
139         mReader.close();
140     }
141 
142     /**
143      * {@inheritDoc}
144      */
145     @Override
146     public UberBucket readUber() throws TTIOException {
147         return mReader.readUber();
148     }
149 
150     /**
151      * {@inheritDoc}
152      */
153     @Override
154     public void writeUberBucket(UberBucket pBucket) throws TTException {
155         try {
156             long key = pBucket.getBucketKey();
157             write(pBucket);
158             BlobBuilder blobbuilder = mReader.mBlobStore.blobBuilder(Long.toString(-1L));
159             Blob blob = blobbuilder.build();
160             ByteArrayOutputStream byteOut = new ByteArrayOutputStream();
161             DataOutputStream dataOut = new DataOutputStream(byteOut);
162             dataOut.writeLong(key);
163             dataOut.close();
164             blob.setPayload(byteOut.toByteArray());
165             mReader.mBlobStore.putBlob(mReader.mResourceName, blob);
166         } catch (final IOException exc) {
167             throw new TTIOException(exc);
168         }
169 
170     }
171 
172     /**
173      * Single task to write data to the cloud.
174      * 
175      * @author Sebastian Graf, University of Konstanz
176      * 
177      */
178     class WriteTask implements Callable<Long> {
179         /**
180          * The bytes to buffer.
181          */
182         final IBucket mBucket;
183 
184         WriteTask(IBucket pBucket) {
185             this.mBucket = pBucket;
186         }
187 
188         @Override
189         public Long call() throws Exception {
190             boolean finished = false;
191 
192             while (!finished) {
193                 try {
194                     ByteArrayOutputStream byteOut = new ByteArrayOutputStream();
195                     DataOutputStream dataOut = new DataOutputStream(mReader.mByteHandler.serialize(byteOut));
196                     mBucket.serialize(dataOut);
197                     dataOut.close();
198                     
199                     //storing length in front of byte array
200                     final byte[] data = byteOut.toByteArray();
201                     final ByteBuffer buffer = ByteBuffer.allocate(4 + data.length);
202                     buffer.putInt(buffer.capacity());
203                     buffer.put(data);
204 
205                     BlobBuilder blobbuilder =
206                         mReader.mBlobStore.blobBuilder(Long.toString(mBucket.getBucketKey()));
207                     Blob blob = blobbuilder.build();
208                     
209                     blob.setPayload(buffer.array());
210                     mReader.mBlobStore.putBlob(mReader.mResourceName, blob);
211                 } catch (Exception e) {
212 
213                 }
214                 finished = mReader.mBlobStore.blobExists(mReader.mResourceName, Long.toString(mBucket.getBucketKey()));
215 
216                 // upload.write(mBucket.getBucketKey() + "," + mBucket.getClass().getName() + "\n");
217                 // upload.flush();
218             }
219 
220             return mBucket.getBucketKey();
221         }
222     }
223     //
224     // class WriteFutureCleaner implements Callable<Long> {
225     //
226     // public Long call() throws Exception {
227     // boolean run = true;
228     // while (run) {
229     // Future<Long> element = mWriterCompletion.take();
230     // if (!element.isCancelled()) {
231     // long id = element.get();
232     // if (id == POISONNUMBER) {
233     // run = false;
234     // } else {
235     // mRunningWriteTasks.remove(element.get());
236     // }
237     // }
238     // }
239     // return POISONNUMBER;
240     // }
241     // }
242     //
243     // /**
244     // * Tasks for ending the cleaner .
245     // *
246     // * @author Sebastian Graf, University of Konstanz
247     // *
248     // */
249     // class PoisonTask implements Callable<Long> {
250     //
251     // /**
252     // * {@inheritDoc}
253     // */
254     // @Override
255     // public Long call() throws Exception {
256     // return POISONNUMBER;
257     // }
258     // }
259 
260 }