View Javadoc

1   package org.treetank.filelistener.file;
2   
3   import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
4   import static java.nio.file.StandardWatchEventKinds.ENTRY_DELETE;
5   import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;
6   import static java.nio.file.StandardWatchEventKinds.OVERFLOW;
7   
8   import java.io.File;
9   import java.io.FileNotFoundException;
10  import java.io.IOException;
11  import java.nio.file.FileSystems;
12  import java.nio.file.Path;
13  import java.nio.file.Paths;
14  import java.nio.file.StandardOpenOption;
15  import java.nio.file.WatchEvent;
16  import java.nio.file.WatchKey;
17  import java.nio.file.WatchService;
18  import java.util.ArrayList;
19  import java.util.HashMap;
20  import java.util.List;
21  import java.util.Map;
22  import java.util.Map.Entry;
23  import java.util.concurrent.ConcurrentHashMap;
24  import java.util.concurrent.ExecutorService;
25  import java.util.concurrent.Executors;
26  
27  import org.slf4j.Logger;
28  import org.slf4j.LoggerFactory;
29  import org.treetank.access.FilelistenerWriteTrx;
30  import org.treetank.api.IFilelistenerWriteTrx;
31  import org.treetank.api.ISession;
32  import org.treetank.exception.TTException;
33  import org.treetank.filelistener.exceptions.ResourceNotExistingException;
34  
35  import com.google.common.io.ByteArrayDataInput;
36  import com.google.common.io.ByteArrayDataOutput;
37  import com.google.common.io.ByteStreams;
38  
39  /**
40   * @author Andreas Rain
41   */
42  public class Filelistener {
43  
44  	private static final Logger LOGGER = LoggerFactory
45  			.getLogger(Filelistener.class);
46  
47  	/** The watchservice from java to watch the different paths. */
48  	private final WatchService mWatcher;
49  	/** A map consisting of the paths that are being watched. */
50  	private final Map<WatchKey, Path> mKeyPaths = new ConcurrentHashMap<WatchKey, Path>();
51  
52  	/**
53  	 * A map that consists of the storageName as the key and the watched folder
54  	 * path for the storage.
55  	 */
56  	private static Map<String, String> mFilelistenerToPaths;
57  	/**
58  	 * A map that consists of the storageName as the key and the session from
59  	 * treetank for the storage.
60  	 */
61  	private Map<String, ISession> mSessions;
62  	/**
63  	 * A map that consists of the storageName as the key and the page
64  	 * transaction from treetank for the storage.
65  	 */
66  	private Map<String, IFilelistenerWriteTrx> mTrx;
67  	/**
68  	 * This map holds all subdirectories and the registration with the
69  	 * watchservice
70  	 */
71  	private Map<String, List<String>> mSubDirectories;
72  
73  	/**
74  	 * A map consisting of the services that handle tasks for different
75  	 * resources parallel
76  	 */
77  	private Map<String, ExecutorService> mExecutorMap;
78  
79  	/** Map to keep track of filesystemnotifications */
80  	private Map<String, FilesystemNotification> mLockedFiles;
81  	/** Map to put FSN inside if a fsn is still locked in mLockedFiles */
82  	private Map<String, FilesystemNotification> mFsnOnHold;
83  	/** Observer class to be notified when a notification has been processed */
84  	private FilesystemNotificationObserver mObserver;
85  
86  	/**
87  	 * This thread is used, so the program does not get blocked by the
88  	 * watchservice.
89  	 */
90  	private volatile Thread mProcessingThread;
91  
92  	/**
93  	 * @throws IOException
94  	 */
95  	public Filelistener() throws IOException {
96  		this.mWatcher = FileSystems.getDefault().newWatchService();
97  		mSubDirectories = new HashMap<String, List<String>>();
98  		mExecutorMap = new HashMap<String, ExecutorService>();
99  		mLockedFiles = new HashMap<>();
100 		mFsnOnHold = new HashMap<>();
101 	}
102 
103 	/**
104 	 * Get the observer for this filelistener
105 	 * 
106 	 * @return observer
107 	 */
108 	public FilesystemNotificationObserver getObserver() {
109 		return mObserver;
110 	}
111 
112 	/**
113 	 * Set the observer for this filelistener
114 	 */
115 	public void setObserver(FilesystemNotificationObserver pObserver) {
116 		this.mObserver = pObserver;
117 	}
118 
119 	/**
120 	 * @param dir
121 	 * @throws IOException
122 	 */
123 	public void watchDir(File dir) throws IOException {
124 		Path p = dir.toPath();
125 		WatchKey key = p.register(mWatcher, ENTRY_CREATE, ENTRY_DELETE,
126 				ENTRY_MODIFY);
127 		mKeyPaths.put(key, p);
128 	}
129 
130 	/**
131 	 * Start listening to the defined folders.
132 	 * 
133 	 * @throws FileNotFoundException
134 	 * @throws ClassNotFoundException
135 	 * @throws IOException
136 	 * @throws ResourceNotExistingException
137 	 * @throws TTException
138 	 */
139 	public void startListening() throws FileNotFoundException,
140 			ClassNotFoundException, IOException, ResourceNotExistingException,
141 			TTException {
142 		mProcessingThread = new Thread() {
143 			public void run() {
144 				try {
145 					processFileNotifications();
146 				} catch (InterruptedException | TTException | IOException e) {
147 				}
148 			}
149 		};
150 		mProcessingThread.start();
151 
152 		initSessions();
153 	}
154 
155 	/**
156 	 * This method is used to initialize a session with treetank for every
157 	 * storage configuration thats in the database.
158 	 * 
159 	 * @throws FileNotFoundException
160 	 * @throws ClassNotFoundException
161 	 * @throws IOException
162 	 * @throws ResourceNotExistingException
163 	 * @throws TTException
164 	 */
165 	private void initSessions() throws FileNotFoundException,
166 			ClassNotFoundException, IOException, ResourceNotExistingException,
167 			TTException {
168 		Map<String, String> filelisteners = getFilelisteners();
169 		mSessions = new HashMap<String, ISession>();
170 		mTrx = new HashMap<String, IFilelistenerWriteTrx>();
171 
172 		if (filelisteners.isEmpty()) {
173 			return;
174 		}
175 
176 		for (Entry<String, String> e : filelisteners.entrySet()) {
177 			mSessions.put(e.getKey(), StorageManager.getSession(e.getKey()));
178 			mTrx.put(e.getKey(),
179 					new FilelistenerWriteTrx(mSessions.get(e.getKey())
180 							.beginBucketWtx(), mSessions.get(e.getKey())));
181 			mSubDirectories.put(e.getValue(), new ArrayList<String>());
182 			mExecutorMap.put(e.getValue(), Executors.newSingleThreadExecutor());
183 
184 			List<String> subDirs = mSubDirectories.get(e.getValue());
185 
186 			for (String s : mTrx.get(e.getKey()).getFilePaths()) {
187 				String fullFilePath = new StringBuilder().append(e.getValue())
188 						.append(File.separator).append(s).toString();
189 				subDirs.add(fullFilePath);
190 
191 				Path p = Paths.get(fullFilePath);
192 
193 				watchParents(p, e.getValue());
194 			}
195 		}
196 	}
197 
198 	/**
199 	 * Watch parent folders of this file until the root listener path has been
200 	 * reached.
201 	 * 
202 	 * @param p
203 	 * @param until
204 	 * @throws IOException
205 	 */
206 	private void watchParents(Path p, String until) throws IOException {
207 		if (p.getParent() != null && !until.equals(p.getParent().toString())) {
208 			watchDir(p.getParent().toFile());
209 			watchParents(p.getParent(), until);
210 		}
211 	}
212 
213 	/**
214 	 * Release transactions and session from treetank.
215 	 * 
216 	 * @throws TTException
217 	 */
218 	private void releaseSessions() throws TTException {
219 		if (mSessions == null) {
220 			return;
221 		}
222 
223 		// Closing all transactions.
224 		try {
225 			for (IFilelistenerWriteTrx trx : mTrx.values()) {
226 				trx.close();
227 			}
228 		} catch (IllegalStateException ise) {
229 			ise.printStackTrace();
230 		}
231 
232 		// Closing all storages aswell.
233 		for (ISession s : mSessions.values()) {
234 			s.close();
235 		}
236 	}
237 
238 	/**
239 	 * Shutdown listening to the defined folders and release all bonds to
240 	 * Treetank.
241 	 * 
242 	 * @throws TTException
243 	 * @throws IOException
244 	 */
245 	public void shutDownListener() throws TTException, IOException {
246 		for (ExecutorService s : mExecutorMap.values()) {
247 			s.shutdown();
248 
249 			while (!s.isTerminated()) {
250 				// Do nothing.
251 				try {
252 					Thread.sleep(1000);
253 				} catch (InterruptedException e) {
254 					LOGGER.error(e.getStackTrace().toString());
255 				}
256 			}
257 		}
258 
259 		Thread thr = mProcessingThread;
260 		if (thr != null) {
261 			thr.interrupt();
262 		}
263 
264 		mWatcher.close();
265 
266 		releaseSessions();
267 	}
268 
269 	/**
270 	 * In this method the notifications of the filesystem if anything changed in
271 	 * a folder that the system is listening to are being extracted and
272 	 * processed.
273 	 * 
274 	 * @throws InterruptedException
275 	 * @throws IOException
276 	 * @throws TTException
277 	 */
278 	private void processFileNotifications() throws InterruptedException,
279 			TTException, IOException {
280 		while (true) {
281 			WatchKey key = mWatcher.take();
282 			Path dir = mKeyPaths.get(key);
283 			for (WatchEvent<?> evt : key.pollEvents()) {
284 				WatchEvent.Kind<?> eventType = evt.kind();
285 				if (eventType == OVERFLOW)
286 					continue;
287 				Object o = evt.context();
288 				if (o instanceof Path) {
289 					Path path = dir.resolve((Path) evt.context());
290 					process(dir, path, eventType);
291 				}
292 			}
293 			key.reset();
294 
295 			processFsnOnHold();
296 		}
297 	}
298 
299 	private void processFsnOnHold() {
300 		// Maybe lockedfiles are finished
301 		// synchronized (mFsnOnHold) {
302 		// for (String s : mFsnOnHold
303 		// .keySet()) {
304 		// if (mLockedFiles.get(s) != null
305 		// && mLockedFiles.get(s)
306 		// .isFinished()) {
307 		// mLockedFiles.remove(s);
308 		//
309 		// System.out.println("fsnonhold "
310 		// + s);
311 		// ExecutorService service = mExecutorMap.get(s);
312 		// if (service != null && !service.isShutdown()) {
313 		// mLockedFiles.put(s, mFsnOnHold.get(s));
314 		// mFsnOnHold.remove(s);
315 		// service.submit(mLockedFiles.get(s));
316 		// }
317 		// }
318 		// }
319 		// }
320 
321 	}
322 
323 	/**
324 	 * This method is used to process the file system modifications.
325 	 * 
326 	 * @param dir
327 	 * @param file
328 	 * @param evtType
329 	 * @throws IOException
330 	 * @throws TTException
331 	 * @throws InterruptedException
332 	 */
333 	private void process(Path dir, Path file, WatchEvent.Kind<?> evtType)
334 			throws TTException, IOException, InterruptedException {
335 //		LOGGER.info("Processing " + file.getFileName() + " with event "
336 //				+ evtType);
337 		IFilelistenerWriteTrx trx = null;
338 		String rootPath = getListenerRootPath(dir);
339 
340 		String relativePath = file.toFile().getAbsolutePath();
341 		relativePath = relativePath.substring(
342 				getListenerRootPath(dir).length(), relativePath.length());
343 
344 		for (Entry<String, String> e : mFilelistenerToPaths.entrySet()) {
345 			if (e.getValue().equals(getListenerRootPath(dir))) {
346 				trx = mTrx.get(e.getKey());
347 			}
348 		}
349 
350 		if (file.toFile().isDirectory()) {
351 			if (evtType == ENTRY_CREATE) {
352 				addSubDirectory(dir, file);
353 				return;
354 			} else if (evtType == ENTRY_DELETE) {
355 				for (String s : trx.getFilePaths()) {
356 					if (s.contains(relativePath)) {
357 						trx.removeFile(s);
358 					}
359 				}
360 			}
361 		} else {
362 //			if (mLockedFiles.get(rootPath + File.separator
363 //					+ file.toFile().getName()) != null) {
364 //				if (mLockedFiles.get(
365 //						rootPath + File.separator + file.toFile().getName())
366 //						.isFinished()) {
367 //					ExecutorService s = mExecutorMap
368 //							.get(getListenerRootPath(dir));
369 //					if (s != null && !s.isShutdown()) {
370 //
371 //						FilesystemNotification n = new FilesystemNotification(
372 //								file.toFile(), relativePath, rootPath, evtType,
373 //								trx);
374 //						if (mObserver != null) {
375 //							n.addObserver(mObserver);
376 //						}
377 //						mFsnOnHold.remove(rootPath + File.separator
378 //								+ file.toFile().getName());
379 //						mLockedFiles.put(rootPath + File.separator
380 //								+ file.toFile().getName(), n);
381 //						s.submit(n);
382 //					}
383 //				} else {
384 //					FilesystemNotification n = new FilesystemNotification(
385 //							file.toFile(), relativePath, rootPath, evtType, trx);
386 //					if (mObserver != null) {
387 //						n.addObserver(mObserver);
388 //					}
389 //					mFsnOnHold.put(rootPath + File.separator
390 //							+ file.toFile().getName(), n);
391 //				}
392 //			} else {
393 				ExecutorService s = mExecutorMap.get(getListenerRootPath(dir));
394 				if (s != null && !s.isShutdown()) {
395 					FilesystemNotification n = new FilesystemNotification(
396 							file.toFile(), relativePath, rootPath, evtType, trx);
397 					if (mObserver != null) {
398 						n.addObserver(mObserver);
399 					}
400 //					mLockedFiles.put(rootPath + File.separator
401 //							+ file.toFile().getName(), n);
402 
403 					s.submit(n);
404 //				}
405 			}
406 		}
407 
408 	}
409 
410 	/**
411 	 * In this method a subdirectory is being added to the system and watched.
412 	 * 
413 	 * This is necessary since the {@link WatchService} doesn't support watching
414 	 * a folder with higher depths than 1.
415 	 * 
416 	 * @param root
417 	 * @param filePath
418 	 * @throws IOException
419 	 */
420 	private void addSubDirectory(Path root, Path filePath) throws IOException {
421 		String listener = getListenerRootPath(root);
422 
423 		List<String> listeners = mSubDirectories.get(listener);
424 
425 		if (listeners != null) {
426 			if (mSubDirectories.get(listener).contains(
427 					filePath.toAbsolutePath())) {
428 				return;
429 			} else {
430 				mSubDirectories.get(listener).add(filePath.toString());
431 			}
432 
433 			try {
434 				watchDir(filePath.toFile());
435 			} catch (IOException e) {
436 				throw new IOException("Could not watch the subdirectories.", e);
437 			}
438 		}
439 	}
440 
441 	/**
442 	 * This utility method allows you to get the root path for a subdirectory.
443 	 * 
444 	 * The root path is a directory that has been explicitly listened to and not
445 	 * just recursively.
446 	 * 
447 	 * @param root
448 	 * @return returns the root path as a String
449 	 */
450 	private String getListenerRootPath(Path root) {
451 		String listener = "";
452 
453 		for (String s : mFilelistenerToPaths.values()) {
454 			if (root.toString().contains(s)) {
455 				listener = s;
456 			}
457 		}
458 
459 		return listener;
460 	}
461 
462 	/**
463 	 * A utility method to get all filelisteners that are already defined and
464 	 * stored.
465 	 * 
466 	 * @return returns a map of relative paths to the folders as the keyset and
467 	 *         the resourcenames that point to the configurations in the
468 	 *         valueset.
469 	 * @throws FileNotFoundException
470 	 * @throws IOException
471 	 * @throws ClassNotFoundException
472 	 */
473 	public static Map<String, String> getFilelisteners()
474 			throws FileNotFoundException, IOException, ClassNotFoundException {
475 
476 		mFilelistenerToPaths = new HashMap<String, String>();
477 
478 		File listenerFilePaths = new File(StorageManager.ROOT_PATH
479 				+ File.separator + "mapping.data");
480 
481 		getFileListenersFromSystem(listenerFilePaths);
482 
483 		return mFilelistenerToPaths;
484 	}
485 
486 	/**
487 	 * Add a new filelistener to the system.
488 	 * 
489 	 * @param pResourcename
490 	 * @param pListenerPath
491 	 * @return return true if there has been a success.
492 	 * @throws FileNotFoundException
493 	 * @throws IOException
494 	 * @throws ClassNotFoundException
495 	 */
496 	public static boolean addFilelistener(String pResourcename,
497 			String pListenerPath) throws FileNotFoundException, IOException,
498 			ClassNotFoundException {
499 		mFilelistenerToPaths = new HashMap<String, String>();
500 
501 		File listenerFilePaths = new File(StorageManager.ROOT_PATH
502 				+ File.separator + "mapping.data");
503 
504 		getFileListenersFromSystem(listenerFilePaths);
505 
506 		mFilelistenerToPaths.put(pResourcename, pListenerPath);
507 
508 		ByteArrayDataOutput output = ByteStreams.newDataOutput();
509 		for (Entry<String, String> e : mFilelistenerToPaths.entrySet()) {
510 			output.write((e.getKey() + "\n").getBytes());
511 			output.write((e.getValue() + "\n").getBytes());
512 		}
513 
514 		java.nio.file.Files.write(listenerFilePaths.toPath(),
515 				output.toByteArray(), StandardOpenOption.TRUNCATE_EXISTING);
516 
517 		return true;
518 	}
519 
520 	/**
521 	 * You can remove a filelistener from the system identifying it with it's
522 	 * Storagename.
523 	 * 
524 	 * The listeners have to be shutdown to do this task.
525 	 * 
526 	 * @param pResourcename
527 	 * @return true if resource could be removed
528 	 * @throws IOException
529 	 * @throws TTException
530 	 * @throws ResourceNotExistingException
531 	 */
532 	public boolean removeFilelistener(String pResourcename) throws IOException,
533 			TTException, ResourceNotExistingException {
534 		mFilelistenerToPaths = new HashMap<String, String>();
535 
536 		File listenerFilePaths = new File(StorageManager.ROOT_PATH
537 				+ File.separator + "mapping.data");
538 
539 		getFileListenersFromSystem(listenerFilePaths);
540 
541 		mFilelistenerToPaths.remove(pResourcename);
542 
543 		StorageManager.removeResource(pResourcename);
544 
545 		ByteArrayDataOutput output = ByteStreams.newDataOutput();
546 		for (Entry<String, String> e : mFilelistenerToPaths.entrySet()) {
547 			output.write((e.getKey() + "\n").getBytes());
548 			output.write((e.getValue() + "\n").getBytes());
549 		}
550 
551 		java.nio.file.Files.write(listenerFilePaths.toPath(),
552 				output.toByteArray(), StandardOpenOption.TRUNCATE_EXISTING);
553 
554 		return true;
555 	}
556 
557 	/**
558 	 * This is a helper method that let's you initialize mFilelistenerToPaths
559 	 * with all the filelisteners that have been stored in the filesystem.
560 	 * 
561 	 * @param pListenerFilePaths
562 	 * @throws IOException
563 	 */
564 	private static void getFileListenersFromSystem(File pListenerFilePaths)
565 			throws IOException {
566 		if (!pListenerFilePaths.exists()) {
567 			java.nio.file.Files.createFile(pListenerFilePaths.toPath());
568 		} else {
569 			byte[] bytes = java.nio.file.Files.readAllBytes(pListenerFilePaths
570 					.toPath());
571 
572 			ByteArrayDataInput input = ByteStreams.newDataInput(bytes);
573 
574 			String key;
575 			while ((key = input.readLine()) != null) {
576 				String val = input.readLine();
577 
578 				mFilelistenerToPaths.put(key, val);
579 			}
580 		}
581 	}
582 
583 	/**
584 	 * Get the desired transaction. Primarily used by the workers to operate.
585 	 * 
586 	 * @param key
587 	 * @return the transaction for the given resource key
588 	 */
589 	public synchronized IFilelistenerWriteTrx getTrx(String key) {
590 		return mTrx.get(key);
591 	}
592 
593 }