View Javadoc

1   /**
2    * Copyright (c) 2011, University of Konstanz, Distributed Systems Group
3    * All rights reserved.
4    * 
5    * Redistribution and use in source and binary forms, with or without
6    * modification, are permitted provided that the following conditions are met:
7    * * Redistributions of source code must retain the above copyright
8    * notice, this list of conditions and the following disclaimer.
9    * * Redistributions in binary form must reproduce the above copyright
10   * notice, this list of conditions and the following disclaimer in the
11   * documentation and/or other materials provided with the distribution.
12   * * Neither the name of the University of Konstanz nor the
13   * names of its contributors may be used to endorse or promote products
14   * derived from this software without specific prior written permission.
15   * 
16   * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
17   * ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
18   * WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
19   * DISCLAIMED. IN NO EVENT SHALL <COPYRIGHT HOLDER> BE LIABLE FOR ANY
20   * DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
21   * (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
22   * LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
23   * ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
24   * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
25   * SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26   */
27  
28  package org.treetank.access;
29  
30  import static com.google.common.base.Objects.toStringHelper;
31  import static com.google.common.base.Preconditions.checkNotNull;
32  import static com.google.common.base.Preconditions.checkState;
33  import static org.treetank.access.BucketReadTrx.dataBucketOffset;
34  
35  import java.io.File;
36  import java.util.ArrayList;
37  import java.util.Arrays;
38  import java.util.List;
39  import java.util.Map;
40  import java.util.Set;
41  import java.util.Stack;
42  import java.util.concurrent.Callable;
43  import java.util.concurrent.ExecutorService;
44  import java.util.concurrent.Executors;
45  
46  import org.jclouds.javax.annotation.Nullable;
47  import org.treetank.access.conf.ConstructorProps;
48  import org.treetank.api.IBucketWriteTrx;
49  import org.treetank.api.IMetaEntry;
50  import org.treetank.api.IData;
51  import org.treetank.api.ISession;
52  import org.treetank.bucket.BucketFactory;
53  import org.treetank.bucket.IConstants;
54  import org.treetank.bucket.IndirectBucket;
55  import org.treetank.bucket.MetaBucket;
56  import org.treetank.bucket.DataBucket;
57  import org.treetank.bucket.DataBucket.DeletedData;
58  import org.treetank.bucket.RevisionRootBucket;
59  import org.treetank.bucket.UberBucket;
60  import org.treetank.bucket.interfaces.IBucket;
61  import org.treetank.bucket.interfaces.IReferenceBucket;
62  import org.treetank.exception.TTException;
63  import org.treetank.exception.TTIOException;
64  import org.treetank.io.IBackendWriter;
65  import org.treetank.io.ILog;
66  import org.treetank.io.LRULog;
67  import org.treetank.io.LogKey;
68  import org.treetank.io.LogValue;
69  
70  import com.google.common.cache.Cache;
71  import com.google.common.cache.CacheBuilder;
72  import com.google.common.io.ByteArrayDataInput;
73  import com.google.common.io.ByteArrayDataOutput;
74  import com.google.common.io.ByteStreams;
75  
76  /**
77   * <h1>BucketWriteTrx</h1>
78   * 
79   * <p>
80   * See {@link BucketReadTrx}.
81   * </p>
82   */
83  public final class BucketWriteTrx implements IBucketWriteTrx {
84  
85      /** BackendWriter to serialize. */
86      private final IBackendWriter mBackendWriter;
87  
88      /** Reference to the actual uberBucket. */
89      private UberBucket mNewUber;
90  
91      /** Reference to the actual revRoot. */
92      private RevisionRootBucket mNewRoot;
93  
94      /** Last reference to the actual nameBucket. */
95      private MetaBucket mNewMeta;
96  
97      /** Delegate for read access. */
98      private BucketReadTrx mDelegate;
99  
100     /** Executor for tracing commit in progress. */
101     private final ExecutorService mCommitInProgress;
102 
103     /** Bucket-Factory to clone buckets. */
104     private final BucketFactory mBucketFac;
105 
106     /** Current LRULog instance to write currently to. */
107     private ILog mLog;
108 
109     /** Former log instance utilizing while commit is in process. */
110     @Nullable
111     private ILog mFormerLog;
112 
113     /** Transient cache for buffering former data-bucket hashes */
114     private final Cache<Long, byte[]> mFormerDataBucketHashes;
115 
116     /**
117      * Standard constructor.
118      * 
119      * 
120      * @param pSession
121      *            {@link ISession} reference
122      * @param pUberBucket
123      *            root of resource
124      * @param pWriter
125      *            writer where this transaction should write to
126      * @param pRepresentRev
127      *            revision represent
128      * @throws TTIOException
129      *             if IO Error
130      */
131     protected BucketWriteTrx(final ISession pSession, final UberBucket pUberBucket,
132         final IBackendWriter pWriter, final long pRepresentRev) throws TTException {
133         mBackendWriter = pWriter;
134 
135         final long revkey =
136             BucketReadTrx.dereferenceLeafOfTree(pWriter,
137                 pUberBucket.getReferenceKeys()[IReferenceBucket.GUARANTEED_INDIRECT_OFFSET], pRepresentRev)[IConstants.INDIRECT_BUCKET_COUNT.length];
138         final RevisionRootBucket revBucket = (RevisionRootBucket)pWriter.read(revkey);
139         final MetaBucket metaBucket =
140             (MetaBucket)pWriter.read(revBucket.getReferenceKeys()[RevisionRootBucket.META_REFERENCE_OFFSET]);
141 
142         mCommitInProgress = Executors.newSingleThreadExecutor();
143         mDelegate = new BucketReadTrx(pSession, pUberBucket, revBucket, metaBucket, pWriter);
144         mBucketFac = new BucketFactory(pSession.getConfig().mDataFac, pSession.getConfig().mMetaFac);
145 
146         // mLog = new MemoryLog();
147         mLog =
148             new LRULog(new File(pSession.getConfig().mProperties
149                 .getProperty(org.treetank.access.conf.ConstructorProps.RESOURCEPATH)),
150                 pSession.getConfig().mDataFac, pSession.getConfig().mMetaFac);
151 
152         mFormerLog = mLog;
153         mFormerDataBucketHashes = CacheBuilder.newBuilder().maximumSize(16384).build();
154         setUpTransaction(pUberBucket, revBucket, metaBucket, pSession, pRepresentRev);
155     }
156 
157     /**
158      * {@inheritDoc}
159      */
160     public long setData(final IData pData) throws TTException {
161 
162         checkState(!mDelegate.isClosed(), "Transaction already closed");
163         // Allocate data key and increment data count.
164         final long dataKey = pData.getDataKey();
165         final long seqBucketKey = dataKey >> IConstants.INDIRECT_BUCKET_COUNT[3];
166         final int dataBucketOffset = dataBucketOffset(dataKey);
167         final LogValue container = prepareDataBucket(dataKey);
168         final DataBucket modified = ((DataBucket)container.getModified());
169         final DataBucket complete = ((DataBucket)container.getComplete());
170         modified.setData(dataBucketOffset, pData);
171         complete.setData(dataBucketOffset, pData);
172         mLog.put(new LogKey(false, IConstants.INDIRECT_BUCKET_COUNT.length, seqBucketKey), container);
173         return dataKey;
174     }
175 
176     /**
177      * {@inheritDoc}
178      */
179     @Override
180     public void removeData(final IData pData) throws TTException {
181         checkState(!mDelegate.isClosed(), "Transaction already closed");
182         checkNotNull(pData);
183         final long dataBucketKey = pData.getDataKey() >> IConstants.INDIRECT_BUCKET_COUNT[3];
184         LogValue container = prepareDataBucket(pData.getDataKey());
185         final IData delData = new DeletedData(pData.getDataKey());
186         ((DataBucket)container.getComplete()).setData(dataBucketOffset(pData.getDataKey()), delData);
187         ((DataBucket)container.getModified()).setData(dataBucketOffset(pData.getDataKey()), delData);
188 
189         mLog.put(new LogKey(false, IConstants.INDIRECT_BUCKET_COUNT.length, dataBucketKey), container);
190     }
191 
192     /**
193      * {@inheritDoc}
194      */
195     public IData getData(final long pDataKey) throws TTIOException {
196         checkState(!mDelegate.isClosed(), "Transaction already closed");
197         // Calculate bucket and data part for given dataKey.
198         final long dataBucketKey = pDataKey >> IConstants.INDIRECT_BUCKET_COUNT[3];
199         final int dataBucketOffset = dataBucketOffset(pDataKey);
200 
201         final LogKey key = new LogKey(false, IConstants.INDIRECT_BUCKET_COUNT.length, dataBucketKey);
202         LogValue container = mLog.get(key);
203         IData item = null;
204         // Bucket was modified...
205         if (container.getModified() != null) {
206             // ..check if the real data was touched and set it or..
207             if (((DataBucket)container.getModified()).getData(dataBucketOffset) == null) {
208                 item = ((DataBucket)container.getComplete()).getData(dataBucketOffset);
209             }// ..take the data from the complete status of the page.
210             else {
211                 item = ((DataBucket)container.getModified()).getData(dataBucketOffset);
212             }
213             checkNotNull(item, "Item must be set!");
214             item = mDelegate.checkItemIfDeleted(item);
215         }// ...bucket was modified within a former version,...
216         else {
217             // check the former version as..
218             container = mFormerLog.get(key);
219             // ..modified element within this version or...
220             if (container.getModified() != null) {
221                 // ..check if the real data was touched and set it or..
222                 if (((DataBucket)container.getModified()).getData(dataBucketOffset) == null) {
223                     item = clone(((DataBucket)container.getComplete())).getData(dataBucketOffset);
224                 }// ..take the data from the complete status of the page.
225                 else {
226                     item = clone(((DataBucket)container.getComplete())).getData(dataBucketOffset);
227                 }
228                 checkNotNull(item, "Item must be set!");
229                 item = mDelegate.checkItemIfDeleted(item);
230             }// bucket was modified long long before, read it normally.
231             else {
232                 item = mDelegate.getData(pDataKey);
233             }
234         }
235         return item;
236     }
237 
238     /**
239      * {@inheritDoc}
240      */
241     @Override
242     public void commit() throws TTException {
243         checkState(!mDelegate.isClosed(), "Transaction already closed");
244 
245         mDelegate.mSession.waitForRunningCommit();
246 
247         final UberBucket uber = clone(mNewUber);
248         final MetaBucket meta = clone(mNewMeta);
249         final RevisionRootBucket rev = clone(mNewRoot);
250         // storing the reference to the former log.
251         mFormerLog = mLog;
252         // new log
253         // mLog = new MemoryLog();
254         mLog =
255             new LRULog(new File(mDelegate.mSession.getConfig().mProperties
256                 .getProperty(org.treetank.access.conf.ConstructorProps.RESOURCEPATH)), mDelegate.mSession
257                 .getConfig().mDataFac, mDelegate.mSession.getConfig().mMetaFac);
258 
259         mDelegate.mSession.setRunningCommit(mCommitInProgress.submit(new CommitCallable(uber, rev, meta)));
260         // Comment here to enabled blocked behaviour
261         // mDelegate.mSession.waitForRunningCommit();
262 
263         setUpTransaction(uber, rev, meta, mDelegate.mSession, uber.getRevisionNumber());
264 
265     }
266 
267     /**
268      * {@inheritDoc}
269      */
270     @Override
271     public void commitBlocked() throws TTException {
272         commit();
273         mDelegate.mSession.waitForRunningCommit();
274     }
275 
276     public void clearLog() throws TTIOException {
277         mLog.close();
278     }
279 
280     /**
281      * {@inheritDoc}
282      */
283     @Override
284     public boolean close() throws TTIOException {
285         mCommitInProgress.shutdown();
286         mDelegate.mSession.waitForRunningCommit();
287         if (!mDelegate.isClosed()) {
288             mDelegate.close();
289 
290             try {
291                 // Try to close the log.
292                 // It may already be closed if a commit
293                 // was the last operation.
294                 mLog.close();
295                 mBackendWriter.close();
296             } catch (IllegalStateException e) {
297                 // Do nothing
298             }
299             mDelegate.mSession.deregisterBucketTrx(this);
300             return true;
301         } else {
302             return false;
303         }
304     }
305 
306     /**
307      * {@inheritDoc}
308      */
309     @Override
310     public long incrementDataKey() {
311         checkState(!mDelegate.isClosed(), "Transaction already closed");
312         return mNewRoot.incrementMaxDataKey();
313     }
314 
315     /**
316      * {@inheritDoc}
317      */
318     @Override
319     public long getRevision() throws TTIOException {
320         checkState(!mDelegate.isClosed(), "Transaction already closed");
321         return mNewRoot.getRevision();
322     }
323 
324     /**
325      * {@inheritDoc}
326      */
327     @Override
328     public boolean isClosed() {
329         return mDelegate.isClosed();
330     }
331 
332     /**
333      * {@inheritDoc}
334      */
335     @Override
336     public MetaBucket getMetaBucket() {
337         checkState(!mDelegate.isClosed(), "Transaction already closed");
338         return mNewMeta;
339     }
340 
341     private LogValue prepareDataBucket(final long pDataKey) throws TTException {
342 
343         final long seqDataBucketKey = pDataKey >> IConstants.INDIRECT_BUCKET_COUNT[3];
344 
345         final LogKey key = new LogKey(false, IConstants.INDIRECT_BUCKET_COUNT.length, seqDataBucketKey);
346         // See if on dataBucketLevel, there are any buckets.
347         LogValue container = mLog.get(key);
348         // if not,...
349         if (container.getModified() == null) {
350             // ..start preparing a new container to be logged
351             final LogKey indirectKey = preparePathToLeaf(false, mNewRoot, pDataKey);
352             final LogValue indirectContainer = mLog.get(indirectKey);
353             final int dataOffset = dataBucketOffset(seqDataBucketKey);
354             final long bucketKey =
355                 ((IndirectBucket)indirectContainer.getModified()).getReferenceKeys()[dataOffset];
356             final long newBucketKey = mNewUber.incrementBucketCounter();
357             // if there is not any bucket already existing...
358             if (bucketKey != 0) {
359                 // ...just denote the number of elements necessary to restore (only for visibility reasons).
360                 final int revToRestore =
361                     Integer.parseInt(mDelegate.mSession.getConfig().mProperties
362                         .getProperty(ConstructorProps.NUMBERTORESTORE));
363 
364                 // Gather all data, from the former log..
365                 final LogValue formerModified = mFormerLog.get(key);
366                 // ..and from the former revision.
367                 final List<DataBucket> formerBuckets = mDelegate.getSnapshotBuckets(seqDataBucketKey);
368                 // declare summarized buckets.
369                 final List<DataBucket> bucketList = new ArrayList<DataBucket>();
370 
371                 // Look, if a former log is currently in process to be written...
372                 if (formerModified.getModified() != null) {
373                     // ..if so, check if the modified one...
374                     final DataBucket currentlyInProgress = (DataBucket)formerModified.getModified();
375                     // ... is the same one than recently written (to avoid race conditions).
376                     if (formerBuckets.isEmpty()
377                         || formerBuckets.get(0).getBucketKey() < currentlyInProgress.getBucketKey()) {
378                         bucketList.add((DataBucket)clone(currentlyInProgress));
379                     }
380                 }
381 
382                 // All currently written elements are inserted so if no elements are in the bucketlist...
383                 if (bucketList.isEmpty() || formerBuckets.size() < revToRestore) {
384                     // ...add all former ones...
385                     bucketList.addAll(formerBuckets);
386                 }// ..otherwise, take all elements starting index 1 into account
387                 else {
388                     if (formerBuckets.size() > 1) {
389                         bucketList.addAll(formerBuckets.subList(0, formerBuckets.size() - 1));
390                     }
391                 }
392 
393                 // Transform into array..
394                 final DataBucket[] buckets = bucketList.toArray(new DataBucket[bucketList.size()]);
395                 // ..and check that the number of buckets are valid and return the entire bucket.
396                 checkState(buckets.length > 0);
397                 container =
398                     mDelegate.mSession.getConfig().mRevision.combineBucketsForModification(revToRestore,
399                         newBucketKey, buckets, mNewRoot.getRevision() % revToRestore == 0);
400 
401                 // // // DEBUG CODE!!!!!
402                 // IData[] toCheck = ((DataBucket)container.getComplete()).getDatas();
403                 // boolean nullFound = false;
404                 // for (int i = 0; i < toCheck.length && !nullFound; i++) {
405                 // if ((i < toCheck.length - 1 && i > 0)
406                 // && (toCheck[i + 1] != null && toCheck[i] == null && toCheck[i - 1] != null)) {
407                 // nullFound = true;
408                 // }
409                 // }
410                 // if (nullFound) {
411                 // System.out.println("-----FAILURE------");
412                 // for (int i = 0; i < buckets.length; i++) {
413                 // System.out.println("+++++++++++++++");
414                 // System.out.println(buckets[i].toString());
415                 // System.out.println("+++++++++++++++");
416                 // }
417                 // System.out.println("-----------");
418                 // System.exit(-1);
419                 // }
420 
421             }// ...if no bucket is existing, create an entirely new one.
422             else {
423                 final DataBucket newBucket = new DataBucket(newBucketKey, IConstants.NULLDATA);
424                 container = new LogValue(newBucket, newBucket);
425             }
426             ((IndirectBucket)indirectContainer.getModified()).setReferenceKey(dataOffset, newBucketKey);
427             ((IndirectBucket)indirectContainer.getModified()).setReferenceHash(dataOffset,
428                 IConstants.NON_HASHED);
429             mLog.put(indirectKey, indirectContainer);
430             mLog.put(key, container);
431         }
432         return container;
433     }
434 
435     /**
436      * Getting a {@link LogKey} containing the last IndirectBucket with the reference to any new/modified
437      * bucket.
438      * 
439      * @param pIsRootLevel
440      *            is this dereferencing walk based on the the search after a RevRoot or a DataBucket. Needed
441      *            because of the same keys in both subtrees.
442      * @param pBucket
443      *            where to start the tree-walk: either from an UberBucket (related to new
444      *            RevisionRootBuckets) or from a RevisionRootBucket (related to new DataBuckets).
445      * @param pElementKey
446      *            key to be dereferenced
447      * @return the key the container representing the last level
448      * @throws TTException
449      */
450     private LogKey preparePathToLeaf(final boolean pIsRootLevel, final IReferenceBucket pBucket,
451         final long pElementKey) throws TTException {
452 
453         // computing the ordernumbers within all level. The ordernumbers are the position in the sequence of
454         // all buckets within the same level.
455         // ranges are for level 0: 0-127; level 1: 0-16383; level 2: 0-2097151; level 3: 0-268435455; ;level
456         // 4: 0-34359738367
457         long seqBucketKey = -1;
458         // since the revision points to a bucket, the sequence-key bases on the last indirect-layer directly
459         // within the search after a revision,...
460         if (pIsRootLevel) {
461             seqBucketKey = pElementKey >> IConstants.INDIRECT_BUCKET_COUNT[3];
462         } // ...whereas one layer above is used for the datas based on the offsets pointing to datas
463           // instead of buckets.
464         else {
465             seqBucketKey = pElementKey >> IConstants.INDIRECT_BUCKET_COUNT[2];
466         }
467 
468         long[] orderNumber = new long[IConstants.INDIRECT_BUCKET_COUNT.length];
469         for (int level = 0; level < orderNumber.length; level++) {
470             orderNumber[level] = seqBucketKey >> IConstants.INDIRECT_BUCKET_COUNT[level];
471         }
472 
473         IReferenceBucket bucket = null;
474         IReferenceBucket parentBucket = pBucket;
475         LogKey key = null;
476         LogKey parentKey = new LogKey(pIsRootLevel, -1, 0);
477 
478         // Iterate through all levels...
479         for (int level = 0; level < orderNumber.length; level++) {
480             // ...see if the actual bucket requested is already in the log
481             key = new LogKey(pIsRootLevel, level, orderNumber[level]);
482             LogValue container = mLog.get(key);
483             // if the bucket is not existing,..
484             if (container.getModified() == null) {
485                 // ..create a new bucket
486                 final long newKey = mNewUber.incrementBucketCounter();
487                 bucket = new IndirectBucket(newKey);
488 
489                 // compute the offset of the new bucket
490                 int offset = dataBucketOffset(orderNumber[level]);
491 
492                 // if there existed the same bucket in former versions in a former log or
493                 container = mFormerLog.get(key);
494                 IReferenceBucket oldBucket = null;
495                 if (container.getModified() != null) {
496                     oldBucket = (IReferenceBucket)container.getModified();
497                 }
498                 // over the former log or the offset within
499                 // the parent...
500                 else if (parentBucket.getReferenceKeys()[offset] != 0) {
501                     oldBucket =
502                         (IReferenceBucket)mBackendWriter.read(parentBucket.getReferenceKeys()[offset]);
503                 }
504                 // ..copy all references to the new log.
505                 if (oldBucket != null) {
506                     for (int i = 0; i < oldBucket.getReferenceKeys().length; i++) {
507                         bucket.setReferenceKey(i, oldBucket.getReferenceKeys()[i]);
508                         bucket.setReferenceHash(i, oldBucket.getReferenceHashs()[i]);
509                     }
510                 }
511 
512                 // Set the newKey on the computed offset...
513                 parentBucket.setReferenceKey(offset, newKey);
514                 // ...and mark the hashoffset as not written
515                 parentBucket.setReferenceHash(offset, IConstants.NON_HASHED);
516                 // .. and put the parent-reference to the log...
517                 container = new LogValue(parentBucket, parentBucket);
518                 // ..if the parent is not referenced as UberBucket or RevisionRootBucket within the Wtx
519                 // itself...
520                 if (level > 0) {
521                     mLog.put(parentKey, container);
522                 }
523                 // ..but set the reference of the current bucket in every case.
524                 container = new LogValue(bucket, bucket);
525                 mLog.put(key, container);
526 
527             } // if the bucket is already in the log, get it simply from the log.
528             else {
529                 bucket = (IReferenceBucket)container.getModified();
530             }
531             // finally, set the new bucketkey for the next level
532             parentKey = key;
533             parentBucket = bucket;
534         }
535 
536         // Return reference to leaf of indirect tree.
537         return key;
538     }
539 
540     private void setUpTransaction(final UberBucket pUberOld, final RevisionRootBucket pRootToRepresent,
541         final MetaBucket pMetaOld, final ISession pSession, final long pRepresentRev) throws TTException {
542 
543         mNewUber =
544             new UberBucket(pUberOld.getBucketCounter() + 1, pUberOld.getRevisionNumber() + 1, pUberOld
545                 .getBucketCounter() + 1);
546         mNewUber.setReferenceKey(IReferenceBucket.GUARANTEED_INDIRECT_OFFSET,
547             pUberOld.getReferenceKeys()[IReferenceBucket.GUARANTEED_INDIRECT_OFFSET]);
548         mNewUber.setReferenceHash(IReferenceBucket.GUARANTEED_INDIRECT_OFFSET, IConstants.NON_HASHED);
549 
550         // Prepare indirect tree to hold reference to prepared revision root
551         // dataBucketReference.
552         final LogKey indirectKey = preparePathToLeaf(true, mNewUber, mNewUber.getRevisionNumber());
553         final LogValue indirectContainer = mLog.get(indirectKey);
554         final int offset = dataBucketOffset(mNewUber.getRevisionNumber());
555 
556         // Get previous revision root bucket and using this data to initialize a fresh revision root including
557         // the pointers.
558         mNewRoot =
559             new RevisionRootBucket(mNewUber.incrementBucketCounter(), pRepresentRev + 1, pRootToRepresent
560                 .getMaxDataKey());
561         mNewRoot.setReferenceKey(IReferenceBucket.GUARANTEED_INDIRECT_OFFSET, pRootToRepresent
562             .getReferenceKeys()[IReferenceBucket.GUARANTEED_INDIRECT_OFFSET]);
563         mNewRoot.setReferenceHash(IReferenceBucket.GUARANTEED_INDIRECT_OFFSET, IConstants.NON_HASHED);
564 
565         // setting the new revRoot to the correct offset
566         ((IndirectBucket)indirectContainer.getModified()).setReferenceKey(offset, mNewRoot.getBucketKey());
567         ((IndirectBucket)indirectContainer.getModified()).setReferenceHash(offset, IConstants.NON_HASHED);
568 
569         mLog.put(indirectKey, indirectContainer);
570 
571         // Setting up a new metabucket and link it to the new root
572         final Set<Map.Entry<IMetaEntry, IMetaEntry>> keySet = pMetaOld.entrySet();
573         mNewMeta = new MetaBucket(mNewUber.incrementBucketCounter());
574         for (final Map.Entry<IMetaEntry, IMetaEntry> key : keySet) {
575             mNewMeta.put(clone(key.getKey()), clone(key.getValue()));
576         }
577         mNewRoot.setReferenceKey(RevisionRootBucket.META_REFERENCE_OFFSET, mNewMeta.getBucketKey());
578         mNewRoot.setReferenceHash(RevisionRootBucket.META_REFERENCE_OFFSET, IConstants.NON_HASHED);
579 
580     }
581 
582     /**
583      * {@inheritDoc}
584      */
585     @Override
586     public String toString() {
587         return toStringHelper(this).add("mDelegate", mDelegate).add("mBackendWriter", mBackendWriter).add(
588             "mRootBucket", mNewRoot).add("mDelegate", mDelegate).toString();
589     }
590 
591     private IMetaEntry clone(final IMetaEntry pToClone) throws TTIOException {
592         final ByteArrayDataOutput output = ByteStreams.newDataOutput();
593         pToClone.serialize(output);
594         final ByteArrayDataInput input = ByteStreams.newDataInput(output.toByteArray());
595         return mDelegate.mSession.getConfig().mMetaFac.deserializeEntry(input);
596     }
597 
598     @SuppressWarnings("unchecked")
599     private <E extends IBucket> E clone(final E pToClone) throws TTIOException {
600         final ByteArrayDataOutput output = ByteStreams.newDataOutput();
601         pToClone.serialize(output);
602         final ByteArrayDataInput input = ByteStreams.newDataInput(output.toByteArray());
603         return (E)mBucketFac.deserializeBucket(input);
604     }
605 
606     /**
607      * Closing the former log if not needed any more
608      * 
609      * @throws TTIOException
610      *             if any weird happens
611      */
612     private void closeFormerLog() throws TTIOException {
613         if (mFormerLog != null && !mFormerLog.isClosed()) {
614             mFormerLog.close();
615         }
616     }
617 
618     /**
619      * Persistence-task to be performed within a commit.
620      * 
621      * @author Sebastian Graf, University of Konstanz
622      * 
623      */
624     class CommitCallable implements Callable<Void> {
625 
626         final MetaBucket mMeta;
627         final RevisionRootBucket mRoot;
628         final UberBucket mUber;
629 
630         /**
631          * Constructor.
632          * 
633          * @param pUber
634          *            to persist
635          * @param pRoot
636          *            to persist
637          * @param pMeta
638          *            to persist
639          */
640         CommitCallable(final UberBucket pUber, final RevisionRootBucket pRoot, final MetaBucket pMeta) {
641             mUber = pUber;
642             mRoot = pRoot;
643             mMeta = pMeta;
644         }
645 
646         /**
647          * {@inheritDoc}
648          */
649         @Override
650         public Void call() throws Exception {
651             // final long time = System.currentTimeMillis();
652             // iterate data tree
653             iterateSubtree(false);
654             // get last IndirectBucket referenced from the RevRoot.
655             final LogKey dataKey = new LogKey(false, 0, 0);
656             final IReferenceBucket dataBuck = (IReferenceBucket)mFormerLog.get(dataKey).getModified();
657             // Check if there are any modifications and if so, set the hash for the indirect buckets.
658             if (dataBuck != null) {
659                 final byte[] dataHash = dataBuck.secureHash().asBytes();
660                 mBackendWriter.write(dataBuck);
661                 mRoot.setReferenceHash(RevisionRootBucket.GUARANTEED_INDIRECT_OFFSET, dataHash);
662             }
663             // Make the same for the meta bucket which is always written.
664             final byte[] metaHash = mMeta.secureHash().asBytes();
665             mBackendWriter.write(mMeta);
666             mRoot.setReferenceHash(RevisionRootBucket.META_REFERENCE_OFFSET, metaHash);
667 
668             // iterate revision tree
669             iterateSubtree(true);
670 
671             final LogKey revKey = new LogKey(true, 0, 0);
672             final IReferenceBucket revBuck = (IReferenceBucket)mFormerLog.get(revKey).getModified();
673             final byte[] revHash = revBuck.secureHash().asBytes();
674             mBackendWriter.write(revBuck);
675             mUber.setReferenceHash(UberBucket.GUARANTEED_INDIRECT_OFFSET, revHash);
676             mBackendWriter.writeUberBucket(mUber);
677 
678             // // iterating over all data
679             // final Iterator<LogValue> entries = mFormerLog.getIterator();
680             // while (entries.hasNext()) {
681             // LogValue next = entries.next();
682             // IBucket bucket = next.getModified();
683             // // debug code for marking hashes as written
684             // if (bucket instanceof IReferenceBucket) {
685             // IReferenceBucket refBucket = (IReferenceBucket)bucket;
686             // for (int i = 0; i < refBucket.getReferenceHashs().length; i++) {
687             // refBucket.setReferenceHash(i, new byte[] {
688             // 0
689             // });
690             // }
691             // }
692             // mWriter.write(bucket);
693             // }
694             // // writing the important pages
695             // mWriter.write(mMeta);
696             // mWriter.write(mRoot);
697             // mWriter.writeUberBucket(mUber);
698             ((Session)mDelegate.mSession).setLastCommittedUberBucket(mUber);
699             mDelegate = new BucketReadTrx(mDelegate.mSession, mUber, mRoot, mMeta, mBackendWriter);
700             closeFormerLog();
701 
702             // System.out.println("Commit finished: " + (System.currentTimeMillis() - time));
703             return null;
704         }
705 
706         /**
707          * Iterating through the subtree preorder-wise and adapting hashes recursively
708          * 
709          * @param pRootLevel
710          *            if level is rootlevel or not
711          * @throws TTException
712          */
713         private void iterateSubtree(final boolean pRootLevel) throws TTException {
714             IReferenceBucket currentRefBuck;
715             // Stack for caching the next elements (namely the right siblings and die childs)
716             final Stack<LogKey> childAndRightSib = new Stack<LogKey>();
717             // Stack to cache the path to the root
718             final Stack<LogKey> pathToRoot = new Stack<LogKey>();
719             // Push the first Indirect under the RevRoot to the stack
720             childAndRightSib.push(new LogKey(pRootLevel, 0, 0));
721 
722             // if in the version is no data written, an intermediate return can occur.
723             if (mFormerLog.get(childAndRightSib.peek()).getModified() == null) {
724                 return;
725             }
726 
727             // if there are children or right sibling left...
728             while (!childAndRightSib.isEmpty()) {
729                 // ...get the next element including the modified bucket.
730                 final LogKey key = childAndRightSib.pop();
731                 final IBucket val = mFormerLog.get(key).getModified();
732 
733                 // if the bucket is an instance of a ReferenceBucket, it is not a leaf and..
734                 if (val instanceof IReferenceBucket) {
735                     currentRefBuck = (IReferenceBucket)val;
736 
737                     // ..represents either a new child in the tree (if the level of the new data is bigger
738                     // than the last one on the pat the root
739                     if (pathToRoot.isEmpty() || key.getLevel() > pathToRoot.peek().getLevel()) {
740                         // in this case, push the new child to the path to the root.
741                         pathToRoot.push(key);
742                     }// else, it is any right sibling whereas the entire subtree left of the current data must
743                      // be handled
744                     else {
745                         LogKey childKey;
746                         // for all elements on the stack
747                         do {
748                             // ..compute the checksum recursively until..
749                             final LogKey parentKey = pathToRoot.peek();
750                             childKey = pathToRoot.pop();
751                             adaptHash(parentKey, childKey);
752                         }// the left part of the subtree of the parent is done.
753                         while (childKey.getLevel() > key.getLevel());
754                         // Push the own key to the path since we are going one step down now/
755                         pathToRoot.push(key);
756                     }
757 
758                     // Iterate through all reference hashes from behind,..
759                     for (int i = currentRefBuck.getReferenceHashs().length - 1; i >= 0; i--) {
760                         // ..read the hashes and check..
761                         final byte[] hash = currentRefBuck.getReferenceHashs()[i];
762                         // ..if one offset is marked as fresh.
763                         if (Arrays.equals(hash, IConstants.NON_HASHED)) {
764                             // This offset marks the childs to go down.
765                             final LogKey toPush =
766                                 new LogKey(pRootLevel, key.getLevel() + 1,
767                                     (key.getSeq() << IConstants.INDIRECT_BUCKET_COUNT[3]) + i);
768                             childAndRightSib.push(toPush);
769                         }
770                     }
771 
772                 } // if we are on the leaf level...
773                 else {
774                     // if we are over the revroot, take the revroot directly..
775                     if (pRootLevel) {
776                         final byte[] hash = mRoot.secureHash().asBytes();
777                         mBackendWriter.write(mRoot);
778                         final LogKey parentKey = pathToRoot.peek();
779                         final IReferenceBucket parentVal =
780                             (IReferenceBucket)mFormerLog.get(parentKey).getModified();
781                         final int parentOffset =
782                             (int)(key.getSeq() - ((key.getSeq() >> IConstants.INDIRECT_BUCKET_COUNT[3]) << IConstants.INDIRECT_BUCKET_COUNT[3]));
783                         parentVal.setReferenceHash(parentOffset, hash);
784 
785                     } // otherwise, retrieve the bucket from the log.
786                     else {
787                         // if there is a hash marked as "toset" but no suitable bucket available, a former
788                         // commit is currently in progess whereas the bucket must be retrieved from the
789                         // backend.
790                         if (val == null) {
791                             final IReferenceBucket parent =
792                                 (IReferenceBucket)mFormerLog.get(pathToRoot.peek()).getModified();
793                             final int parentOffset =
794                                 (int)(key.getSeq() - ((key.getSeq() >> IConstants.INDIRECT_BUCKET_COUNT[3]) << IConstants.INDIRECT_BUCKET_COUNT[3]));
795                             byte[] persistedHash = mFormerDataBucketHashes.getIfPresent(key.getSeq());
796                             if (persistedHash == null) {
797                                 final IBucket persistedBucket =
798                                     mBackendWriter.read(parent.getReferenceKeys()[parentOffset]);
799                                 persistedHash = persistedBucket.secureHash().asBytes();
800                                 mFormerDataBucketHashes.put(key.getSeq(), persistedHash);
801                             }
802                             parent.setReferenceHash(parentOffset, persistedHash);
803                         }// otherwise construct it over the log.
804                         else {
805                             // ..we need to have a DataBucket and...
806                             checkState(val instanceof DataBucket);
807                             // ...we adapt the parent with the own hash.
808                             final LogKey parentKey = pathToRoot.peek();
809                             adaptHash(parentKey, key);
810                         }
811                     }
812 
813                 }
814             }
815 
816             // After the preorder-traversal, we need to adapt the path to the root with the missing checksums,
817             // but only if there are any modifications.
818             if (!pathToRoot.isEmpty()) {
819                 do {
820                     final LogKey child = pathToRoot.pop();
821                     final LogKey parent = pathToRoot.peek();
822                     adaptHash(parent, child);
823                 } while (pathToRoot.size() > 1);
824             }
825 
826         }
827 
828         /**
829          * Adapting hash and storing it in a parent-bucket
830          * 
831          * @param pParentKey
832          *            the {@link LogKey} for the parent bucket
833          * @param pChildKey
834          *            the {@link LogKey} for the own bucket
835          * @throws TTException
836          */
837         private void adaptHash(final LogKey pParentKey, final LogKey pChildKey) throws TTException {
838             final IBucket val = mFormerLog.get(pChildKey).getModified();
839             final byte[] hash = val.secureHash().asBytes();
840             mBackendWriter.write(val);
841             if (val instanceof DataBucket) {
842                 mFormerDataBucketHashes.put(pChildKey.getSeq(), hash);
843             }
844 
845             final IReferenceBucket parent = (IReferenceBucket)mFormerLog.get(pParentKey).getModified();
846             final int parentOffset =
847                 (int)(pChildKey.getSeq() - ((pChildKey.getSeq() >> IConstants.INDIRECT_BUCKET_COUNT[3]) << IConstants.INDIRECT_BUCKET_COUNT[3]));
848             parent.setReferenceHash(parentOffset, hash);
849         }
850     }
851 }