1 package org.treetank.access;
2
3 import java.io.File;
4 import java.io.IOException;
5 import java.io.RandomAccessFile;
6 import java.nio.ByteBuffer;
7 import java.nio.channels.FileChannel;
8 import java.nio.channels.FileLock;
9 import java.nio.channels.OverlappingFileLockException;
10 import java.util.Arrays;
11
12 import org.treetank.api.IBucketWriteTrx;
13 import org.treetank.api.IFilelistenerWriteTrx;
14 import org.treetank.api.ISession;
15 import org.treetank.exception.TTException;
16 import org.treetank.exception.TTIOException;
17 import org.treetank.filelistener.file.data.FileData;
18 import org.treetank.filelistener.file.data.FilelistenerMetaDataFactory.MetaKey;
19 import org.treetank.filelistener.file.data.FilelistenerMetaDataFactory.MetaValue;
20
21 import com.google.common.base.Preconditions;
22 import com.google.common.io.ByteArrayDataOutput;
23 import com.google.common.io.ByteStreams;
24
25
26
27
28 public class FilelistenerWriteTrx implements IFilelistenerWriteTrx {
29
30
31
32
33 private final ISession mSession;
34
35
36 private final FilelistenerReadTrx mDelegate;
37
38
39
40
41 public FilelistenerWriteTrx(IBucketWriteTrx pPageTrx, ISession pSession) throws TTException {
42 mSession = pSession;
43 mDelegate = new FilelistenerReadTrx(pPageTrx);
44 }
45
46
47
48
49 @Override
50 public String[] getFilePaths() {
51 return mDelegate.getFilePaths();
52 }
53
54
55
56
57 public int getCount() {
58 return mDelegate.getCount();
59 }
60
61
62
63
64 @Override
65 public boolean fileExists(String pRelativePath) {
66 return mDelegate.fileExists(pRelativePath);
67 }
68
69
70
71
72 @Override
73 public File getFullFile(String pRelativePath) throws TTIOException, IOException {
74 return mDelegate.getFullFile(pRelativePath);
75 }
76
77
78
79
80 @Override
81 public void close() throws TTIOException {
82 mDelegate.close();
83 }
84
85
86
87
88 @Override
89 public boolean isClosed() {
90 return mDelegate.isClosed();
91 }
92
93
94
95
96 @Override
97 public synchronized void addEmptyFile(String pRelativePath) throws TTException, IOException {
98 MetaKey key = new MetaKey(pRelativePath);
99 MetaValue value = new MetaValue(FilelistenerReadTrx.emptyFileKey);
100 getBucketTransaction().getMetaBucket().put(key, value);
101
102 return;
103 }
104
105
106
107
108
109
110
111 @Override
112 public synchronized void addFile(File pFile, String pRelativePath) throws TTException, IOException {
113
114
115 int readingAmount = 0;
116
117 @SuppressWarnings("resource")
118 FileChannel ch = new RandomAccessFile(pFile, "rw").getChannel();
119 FileLock lock = null;
120
121 while (lock == null) {
122 try {
123 lock = ch.tryLock();
124 } catch (OverlappingFileLockException e) {
125
126 }
127 }
128
129
130
131 ByteBuffer buffer = ByteBuffer.allocate(FileData.FILENODESIZE);
132
133
134 readingAmount += ch.read(buffer);
135
136
137 if (readingAmount <= 0) {
138 MetaKey key = new MetaKey(pRelativePath);
139 MetaValue value = new MetaValue(FilelistenerReadTrx.emptyFileKey);
140 getBucketTransaction().getMetaBucket().put(key, value);
141
142 return;
143 }
144
145 long newKey = getBucketTransaction().incrementDataKey();
146
147 if (fileExists(pRelativePath)) {
148 removeFile(pRelativePath);
149 }
150
151
152 MetaKey key = new MetaKey(pRelativePath);
153 MetaValue value = new MetaValue(newKey);
154
155
156
157 getBucketTransaction().getMetaBucket().put(key, value);
158
159
160 FileData headerData = new FileData(newKey, buffer.array(), true, false);
161
162 getBucketTransaction().setData(headerData);
163
164
165 FileData data;
166
167 int currentReadingAmount = 0;
168
169 while ((currentReadingAmount = ch.read(buffer = ByteBuffer.allocate(FileData.FILENODESIZE))) > 0) {
170
171 byte[] slice = Arrays.copyOf(buffer.array(), currentReadingAmount);
172
173 long dataKey = getBucketTransaction().incrementDataKey();
174 data = new FileData(dataKey, slice, false, false);
175
176 getBucketTransaction().setData(data);
177
178 readingAmount += currentReadingAmount;
179 }
180
181 ByteArrayDataOutput size = ByteStreams.newDataOutput();
182 size.writeInt(readingAmount);
183
184 data = new FileData(getBucketTransaction().incrementDataKey(), size.toByteArray(), false, true);
185
186 getBucketTransaction().setData(data);
187
188 Preconditions.checkArgument(getBucketTransaction().getData(newKey) != null);
189 lock.release();
190 ch.close();
191 }
192
193
194
195
196 @Override
197 public synchronized void removeFile(String pRelativePath) throws TTException {
198
199
200 getBucketTransaction().getMetaBucket().remove(new MetaKey(pRelativePath));
201 }
202
203
204
205
206 @Override
207 public void commit() throws TTException {
208 checkAccessAndCommit();
209
210
211 getBucketTransaction().commit();
212 }
213
214
215
216
217 @Override
218 public void commitBlocked() throws TTException {
219 checkAccessAndCommit();
220
221
222 getBucketTransaction().commitBlocked();
223 }
224
225
226
227
228
229
230
231 private void checkAccessAndCommit() throws TTException {
232
233 mDelegate.assertNotClosed();
234 }
235
236
237
238
239 @Override
240 public void abort() throws TTException {
241 mDelegate.assertNotClosed();
242
243 long revisionToSet = 0;
244 revisionToSet = mDelegate.mPageReadTrx.getRevision() - 1;
245
246 getBucketTransaction().close();
247
248
249 mDelegate.setPageTransaction(mSession.beginBucketWtx(revisionToSet));
250 }
251
252
253
254
255
256
257 private BucketWriteTrx getBucketTransaction() {
258
259 return (BucketWriteTrx)mDelegate.mPageReadTrx;
260 }
261 }