View Javadoc

1   package org.treetank.access;
2   
3   import java.io.File;
4   import java.io.IOException;
5   import java.io.RandomAccessFile;
6   import java.nio.ByteBuffer;
7   import java.nio.channels.FileChannel;
8   import java.nio.channels.FileLock;
9   import java.nio.channels.OverlappingFileLockException;
10  import java.util.Arrays;
11  
12  import org.treetank.api.IBucketWriteTrx;
13  import org.treetank.api.IFilelistenerWriteTrx;
14  import org.treetank.api.ISession;
15  import org.treetank.exception.TTException;
16  import org.treetank.exception.TTIOException;
17  import org.treetank.filelistener.file.data.FileData;
18  import org.treetank.filelistener.file.data.FilelistenerMetaDataFactory.MetaKey;
19  import org.treetank.filelistener.file.data.FilelistenerMetaDataFactory.MetaValue;
20  
21  import com.google.common.base.Preconditions;
22  import com.google.common.io.ByteArrayDataOutput;
23  import com.google.common.io.ByteStreams;
24  
25  /**
26   * @author Andreas Rain
27   */
28  public class FilelistenerWriteTrx implements IFilelistenerWriteTrx {
29  
30      //    private static final Logger LOGGER = LoggerFactory.getLogger(FilelistenerWriteTrx.class);
31  
32      /** Session for abort/commit. */
33      private final ISession mSession;
34  
35      /** Delegator for the read access */
36      private final FilelistenerReadTrx mDelegate;
37  
38      /**
39       * {@inheritDoc}
40       */
41      public FilelistenerWriteTrx(IBucketWriteTrx pPageTrx, ISession pSession) throws TTException {
42          mSession = pSession;
43          mDelegate = new FilelistenerReadTrx(pPageTrx);
44      }
45  
46      /**
47       * {@inheritDoc}
48       */
49      @Override
50      public String[] getFilePaths() {
51          return mDelegate.getFilePaths();
52      }
53  
54      /**
55       * {@inheritDoc}
56       */
57      public int getCount() {
58          return mDelegate.getCount();
59      }
60  
61      /**
62       * {@inheritDoc}
63       */
64      @Override
65      public boolean fileExists(String pRelativePath) {
66          return mDelegate.fileExists(pRelativePath);
67      }
68  
69      /**
70       * {@inheritDoc}
71       */
72      @Override
73      public File getFullFile(String pRelativePath) throws TTIOException, IOException {
74          return mDelegate.getFullFile(pRelativePath);
75      }
76  
77      /**
78       * {@inheritDoc}
79       */
80      @Override
81      public void close() throws TTIOException {
82          mDelegate.close();
83      }
84  
85      /**
86       * {@inheritDoc}
87       */
88      @Override
89      public boolean isClosed() {
90          return mDelegate.isClosed();
91      }
92  
93      /**
94       * {@inheritDoc}
95       */
96      @Override
97      public synchronized void addEmptyFile(String pRelativePath) throws TTException, IOException {
98          MetaKey key = new MetaKey(pRelativePath);
99          MetaValue value = new MetaValue(FilelistenerReadTrx.emptyFileKey);
100         getBucketTransaction().getMetaBucket().put(key, value);
101 
102         return;
103     }
104 
105     /**
106      * {@inheritDoc}
107      * 
108      * @throws TTException
109      * @throws IOException
110      */
111     @Override
112     public synchronized void addFile(File pFile, String pRelativePath) throws TTException, IOException {
113         // LOGGER.info("Adding file " + pFile.getName());
114 
115         int readingAmount = 0;
116 
117         @SuppressWarnings("resource")
118         FileChannel ch = new RandomAccessFile(pFile, "rw").getChannel();
119         FileLock lock = null;
120 
121         while (lock == null) {
122             try {
123                 lock = ch.tryLock();
124             } catch (OverlappingFileLockException e) {
125                 // File is already locked in this thread or virtual machine
126             }
127         }
128 
129         // LOGGER.info("Filesize: " + ch.size());
130 
131         ByteBuffer buffer = ByteBuffer.allocate(FileData.FILENODESIZE);
132 
133         // LOGGER.debug("Successfully initialized byte source.");
134         readingAmount += ch.read(buffer);
135 
136         // LOGGER.debug("First readAmount: " + readingAmount);
137         if (readingAmount <= 0) {
138             MetaKey key = new MetaKey(pRelativePath);
139             MetaValue value = new MetaValue(FilelistenerReadTrx.emptyFileKey);
140             getBucketTransaction().getMetaBucket().put(key, value);
141 
142             return;
143         }
144 
145         long newKey = getBucketTransaction().incrementDataKey();
146 
147         if (fileExists(pRelativePath)) {
148             removeFile(pRelativePath);
149         }
150 
151         // Setting a new header file data
152         MetaKey key = new MetaKey(pRelativePath);
153         MetaValue value = new MetaValue(newKey);
154 
155         // And adding it to the meta map
156         // LOGGER.debug("Metakeypair setup");
157         getBucketTransaction().getMetaBucket().put(key, value);
158 
159         // Creating and setting the headerdata.
160         FileData headerData = new FileData(newKey, buffer.array(), true, false);
161 
162         getBucketTransaction().setData(headerData);
163 
164         // Creating and setting following datas based on the file size.
165         FileData data;
166 
167         int currentReadingAmount = 0;
168         // LOGGER.info("Iterating file content");
169         while ((currentReadingAmount = ch.read(buffer = ByteBuffer.allocate(FileData.FILENODESIZE))) > 0) {
170             // LOGGER.debug("Curr. read amount: " + currentReadingAmount);
171             byte[] slice = Arrays.copyOf(buffer.array(), currentReadingAmount);
172 
173             long dataKey = getBucketTransaction().incrementDataKey();
174             data = new FileData(dataKey, slice, false, false);
175 
176             getBucketTransaction().setData(data);
177 
178             readingAmount += currentReadingAmount;
179         }
180 
181         ByteArrayDataOutput size = ByteStreams.newDataOutput();
182         size.writeInt(readingAmount);
183 
184         data = new FileData(getBucketTransaction().incrementDataKey(), size.toByteArray(), false, true);
185 
186         getBucketTransaction().setData(data);
187 
188         Preconditions.checkArgument(getBucketTransaction().getData(newKey) != null);
189         lock.release();
190         ch.close();
191     }
192 
193     /**
194      * {@inheritDoc}
195      */
196     @Override
197     public synchronized void removeFile(String pRelativePath) throws TTException {
198         // If the file already exists we just override it
199         // and remove the last meta entry since the key won't be correct anymore.
200         getBucketTransaction().getMetaBucket().remove(new MetaKey(pRelativePath));
201     }
202 
203     /**
204      * {@inheritDoc}
205      */
206     @Override
207     public void commit() throws TTException {
208         checkAccessAndCommit();
209 
210         // ICommitStrategy uber page.
211         getBucketTransaction().commit();
212     }
213 
214     /**
215      * {@inheritDoc}
216      */
217     @Override
218     public void commitBlocked() throws TTException {
219         checkAccessAndCommit();
220 
221         // ICommitStrategy uber page.
222         getBucketTransaction().commitBlocked();
223     }
224 
225     /**
226      * Checking write access and intermediate commit.
227      * 
228      * @throws TTException
229      *             if anything weird happens
230      */
231     private void checkAccessAndCommit() throws TTException {
232 
233         mDelegate.assertNotClosed();
234     }
235 
236     /**
237      * {@inheritDoc}
238      */
239     @Override
240     public void abort() throws TTException {
241         mDelegate.assertNotClosed();
242 
243         long revisionToSet = 0;
244         revisionToSet = mDelegate.mPageReadTrx.getRevision() - 1;
245 
246         getBucketTransaction().close();
247 
248         // Reset internal transaction state to last committed uber page.
249         mDelegate.setPageTransaction(mSession.beginBucketWtx(revisionToSet));
250     }
251 
252     /**
253      * Getter for superclasses.
254      * 
255      * @return The state of this transaction.
256      */
257     private BucketWriteTrx getBucketTransaction() {
258 
259         return (BucketWriteTrx)mDelegate.mPageReadTrx;
260     }
261 }