1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28 package org.treetank.access;
29
30 import static com.google.common.base.Objects.toStringHelper;
31 import static com.google.common.base.Preconditions.checkArgument;
32 import static com.google.common.base.Preconditions.checkState;
33
34 import java.io.File;
35 import java.util.Set;
36 import java.util.concurrent.CopyOnWriteArraySet;
37 import java.util.concurrent.ExecutionException;
38 import java.util.concurrent.Future;
39 import java.util.concurrent.atomic.AtomicBoolean;
40 import java.util.concurrent.atomic.AtomicReference;
41
42 import org.treetank.access.conf.ResourceConfiguration;
43 import org.treetank.access.conf.SessionConfiguration;
44 import org.treetank.access.conf.StorageConfiguration;
45 import org.treetank.api.IBucketReadTrx;
46 import org.treetank.api.IBucketWriteTrx;
47 import org.treetank.api.ISession;
48 import org.treetank.bucket.IConstants;
49 import org.treetank.bucket.MetaBucket;
50 import org.treetank.bucket.RevisionRootBucket;
51 import org.treetank.bucket.UberBucket;
52 import org.treetank.bucket.interfaces.IReferenceBucket;
53 import org.treetank.exception.TTException;
54 import org.treetank.exception.TTIOException;
55 import org.treetank.io.IBackendReader;
56 import org.treetank.io.IBackendWriter;
57 import org.treetank.io.IOUtils;
58
59
60
61
62
63
64
65
66 public final class Session implements ISession {
67
68
69 private final ResourceConfiguration mResourceConfig;
70
71
72 protected final SessionConfiguration mSessionConfig;
73
74
75 private final Storage mDatabase;
76
77
78 private AtomicReference<UberBucket> mLastCommittedUberBucket;
79
80
81 private final Set<IBucketReadTrx> mBucketTrxs;
82
83
84 private transient boolean mClosed;
85
86
87 private AtomicBoolean mWriteTransactionUsed;
88
89
90 private Future<Void> mCommitRunning = null;
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106 protected Session(final Storage pStorage, final ResourceConfiguration pResourceConf,
107 final SessionConfiguration pSessionConf, final UberBucket pBucket) throws TTException {
108 mDatabase = pStorage;
109 mResourceConfig = pResourceConf;
110 mSessionConfig = pSessionConf;
111 mBucketTrxs = new CopyOnWriteArraySet<IBucketReadTrx>();
112 mClosed = false;
113 mLastCommittedUberBucket = new AtomicReference<UberBucket>(pBucket);
114 mWriteTransactionUsed = new AtomicBoolean(false);
115 }
116
117 public IBucketReadTrx beginBucketRtx(final long pRevKey) throws TTException {
118 waitForRunningCommit();
119 assertAccess(pRevKey);
120 final IBackendReader bucketReader = mResourceConfig.mBackend.getReader();
121 final RevisionRootBucket revBucket =
122 (RevisionRootBucket)bucketReader
123 .read(BucketReadTrx.dereferenceLeafOfTree(bucketReader, mLastCommittedUberBucket.get()
124 .getReferenceKeys()[IReferenceBucket.GUARANTEED_INDIRECT_OFFSET], pRevKey)[IConstants.INDIRECT_BUCKET_COUNT.length]);
125 final MetaBucket metaBucket =
126 (MetaBucket)bucketReader
127 .read(revBucket.getReferenceKeys()[RevisionRootBucket.META_REFERENCE_OFFSET]);
128 final BucketReadTrx trx =
129 new BucketReadTrx(this, mLastCommittedUberBucket.get(), revBucket, metaBucket, bucketReader);
130 mBucketTrxs.add(trx);
131 return trx;
132 }
133
134 public IBucketWriteTrx beginBucketWtx() throws TTException {
135 return beginBucketWtx(mLastCommittedUberBucket.get().getRevisionNumber());
136 }
137
138 public IBucketWriteTrx beginBucketWtx(final long mRepresentRevision) throws TTException {
139 checkState(mWriteTransactionUsed.compareAndSet(false, true),
140 "Only one WriteTransaction per Session is allowed");
141 waitForRunningCommit();
142 assertAccess(mRepresentRevision);
143 final IBackendWriter backendWriter = mResourceConfig.mBackend.getWriter();
144 final IBucketWriteTrx trx =
145 new BucketWriteTrx(this, mLastCommittedUberBucket.get(), backendWriter, mRepresentRevision);
146 mBucketTrxs.add(trx);
147 return trx;
148 }
149
150
151
152
153 public synchronized boolean close() throws TTException {
154 waitForRunningCommit();
155 if (!mClosed) {
156
157 for (final IBucketReadTrx rtx : mBucketTrxs) {
158
159 if (rtx instanceof BucketWriteTrx) {
160 ((BucketWriteTrx)rtx).clearLog();
161 }
162 rtx.close();
163 }
164
165
166 mLastCommittedUberBucket = null;
167 mBucketTrxs.clear();
168 mResourceConfig.mBackend.close();
169 mDatabase.mSessions.remove(mSessionConfig.getResource());
170 mClosed = true;
171 return true;
172 } else {
173 return false;
174 }
175 }
176
177
178
179
180 public boolean truncate() throws TTException {
181 checkState(!mClosed, "Session must be opened to truncate.");
182 waitForRunningCommit();
183 if (mResourceConfig.mBackend.truncate()) {
184
185 for (final IBucketReadTrx rtx : mBucketTrxs) {
186 rtx.close();
187 }
188
189 mLastCommittedUberBucket = null;
190 mBucketTrxs.clear();
191 mDatabase.mSessions.remove(mSessionConfig.getResource());
192 mClosed = true;
193 return IOUtils.recursiveDelete(new File(new File(mDatabase.getLocation(),
194 StorageConfiguration.Paths.Data.getFile().getName()), mSessionConfig.getResource()));
195 } else {
196 return false;
197 }
198 }
199
200
201
202
203
204
205
206
207 private void assertAccess(final long pRevision) throws TTIOException {
208 checkState(!mClosed, "Session is already closed.");
209 checkArgument(pRevision <= getMostRecentVersion(), "Revision must not be bigger than %s",
210 getMostRecentVersion());
211 }
212
213 protected void setLastCommittedUberBucket(final UberBucket pBucket) {
214 this.mLastCommittedUberBucket.set(pBucket);
215 }
216
217
218
219
220 @Override
221 public long getMostRecentVersion() throws TTIOException {
222 waitForRunningCommit();
223 return mLastCommittedUberBucket.get().getRevisionNumber();
224 }
225
226
227
228
229 @Override
230 public ResourceConfiguration getConfig() {
231 return mResourceConfig;
232 }
233
234
235
236
237 @Override
238 public boolean deregisterBucketTrx(IBucketReadTrx pTrx) throws TTIOException {
239 if (pTrx instanceof IBucketWriteTrx) {
240 mWriteTransactionUsed.set(false);
241 }
242 return mBucketTrxs.remove(pTrx);
243 }
244
245
246
247
248 @Override
249 public String toString() {
250 return toStringHelper(this).add("mResourceConfig", mResourceConfig).add("mSessionConfig",
251 mSessionConfig).add("mLastCommittedUberBucket", mLastCommittedUberBucket).add(
252 "mLastCommittedUberBucket", mBucketTrxs).toString();
253 }
254
255
256
257
258 @Override
259 public void waitForRunningCommit() throws TTIOException {
260 if (mCommitRunning != null) {
261 try {
262
263 mCommitRunning.get();
264
265 mCommitRunning = null;
266 } catch (InterruptedException | ExecutionException exc) {
267 throw new TTIOException(exc);
268 }
269 }
270 }
271
272
273
274
275 @Override
276 public void setRunningCommit(Future<Void> pRunningCommit) {
277 mCommitRunning = pRunningCommit;
278
279 }
280 }