1 package org.treetank.filelistener.file;
2
3 import static java.nio.file.StandardWatchEventKinds.ENTRY_CREATE;
4 import static java.nio.file.StandardWatchEventKinds.ENTRY_DELETE;
5 import static java.nio.file.StandardWatchEventKinds.ENTRY_MODIFY;
6 import static java.nio.file.StandardWatchEventKinds.OVERFLOW;
7
8 import java.io.File;
9 import java.io.FileNotFoundException;
10 import java.io.IOException;
11 import java.nio.file.FileSystems;
12 import java.nio.file.Path;
13 import java.nio.file.Paths;
14 import java.nio.file.StandardOpenOption;
15 import java.nio.file.WatchEvent;
16 import java.nio.file.WatchKey;
17 import java.nio.file.WatchService;
18 import java.util.ArrayList;
19 import java.util.HashMap;
20 import java.util.List;
21 import java.util.Map;
22 import java.util.Map.Entry;
23 import java.util.concurrent.ConcurrentHashMap;
24 import java.util.concurrent.ExecutorService;
25 import java.util.concurrent.Executors;
26
27 import org.slf4j.Logger;
28 import org.slf4j.LoggerFactory;
29 import org.treetank.access.FilelistenerWriteTrx;
30 import org.treetank.api.IFilelistenerWriteTrx;
31 import org.treetank.api.ISession;
32 import org.treetank.exception.TTException;
33 import org.treetank.filelistener.exceptions.ResourceNotExistingException;
34
35 import com.google.common.io.ByteArrayDataInput;
36 import com.google.common.io.ByteArrayDataOutput;
37 import com.google.common.io.ByteStreams;
38
39
40
41
42 public class Filelistener {
43
44 private static final Logger LOGGER = LoggerFactory
45 .getLogger(Filelistener.class);
46
47
48 private final WatchService mWatcher;
49
50 private final Map<WatchKey, Path> mKeyPaths = new ConcurrentHashMap<WatchKey, Path>();
51
52
53
54
55
56 private static Map<String, String> mFilelistenerToPaths;
57
58
59
60
61 private Map<String, ISession> mSessions;
62
63
64
65
66 private Map<String, IFilelistenerWriteTrx> mTrx;
67
68
69
70
71 private Map<String, List<String>> mSubDirectories;
72
73
74
75
76
77 private Map<String, ExecutorService> mExecutorMap;
78
79
80 private Map<String, FilesystemNotification> mLockedFiles;
81
82 private Map<String, FilesystemNotification> mFsnOnHold;
83
84 private FilesystemNotificationObserver mObserver;
85
86
87
88
89
90 private volatile Thread mProcessingThread;
91
92
93
94
95 public Filelistener() throws IOException {
96 this.mWatcher = FileSystems.getDefault().newWatchService();
97 mSubDirectories = new HashMap<String, List<String>>();
98 mExecutorMap = new HashMap<String, ExecutorService>();
99 mLockedFiles = new HashMap<>();
100 mFsnOnHold = new HashMap<>();
101 }
102
103
104
105
106
107
108 public FilesystemNotificationObserver getObserver() {
109 return mObserver;
110 }
111
112
113
114
115 public void setObserver(FilesystemNotificationObserver pObserver) {
116 this.mObserver = pObserver;
117 }
118
119
120
121
122
123 public void watchDir(File dir) throws IOException {
124 Path p = dir.toPath();
125 WatchKey key = p.register(mWatcher, ENTRY_CREATE, ENTRY_DELETE,
126 ENTRY_MODIFY);
127 mKeyPaths.put(key, p);
128 }
129
130
131
132
133
134
135
136
137
138
139 public void startListening() throws FileNotFoundException,
140 ClassNotFoundException, IOException, ResourceNotExistingException,
141 TTException {
142 mProcessingThread = new Thread() {
143 public void run() {
144 try {
145 processFileNotifications();
146 } catch (InterruptedException | TTException | IOException e) {
147 }
148 }
149 };
150 mProcessingThread.start();
151
152 initSessions();
153 }
154
155
156
157
158
159
160
161
162
163
164
165 private void initSessions() throws FileNotFoundException,
166 ClassNotFoundException, IOException, ResourceNotExistingException,
167 TTException {
168 Map<String, String> filelisteners = getFilelisteners();
169 mSessions = new HashMap<String, ISession>();
170 mTrx = new HashMap<String, IFilelistenerWriteTrx>();
171
172 if (filelisteners.isEmpty()) {
173 return;
174 }
175
176 for (Entry<String, String> e : filelisteners.entrySet()) {
177 mSessions.put(e.getKey(), StorageManager.getSession(e.getKey()));
178 mTrx.put(e.getKey(),
179 new FilelistenerWriteTrx(mSessions.get(e.getKey())
180 .beginBucketWtx(), mSessions.get(e.getKey())));
181 mSubDirectories.put(e.getValue(), new ArrayList<String>());
182 mExecutorMap.put(e.getValue(), Executors.newSingleThreadExecutor());
183
184 List<String> subDirs = mSubDirectories.get(e.getValue());
185
186 for (String s : mTrx.get(e.getKey()).getFilePaths()) {
187 String fullFilePath = new StringBuilder().append(e.getValue())
188 .append(File.separator).append(s).toString();
189 subDirs.add(fullFilePath);
190
191 Path p = Paths.get(fullFilePath);
192
193 watchParents(p, e.getValue());
194 }
195 }
196 }
197
198
199
200
201
202
203
204
205
206 private void watchParents(Path p, String until) throws IOException {
207 if (p.getParent() != null && !until.equals(p.getParent().toString())) {
208 watchDir(p.getParent().toFile());
209 watchParents(p.getParent(), until);
210 }
211 }
212
213
214
215
216
217
218 private void releaseSessions() throws TTException {
219 if (mSessions == null) {
220 return;
221 }
222
223
224 try {
225 for (IFilelistenerWriteTrx trx : mTrx.values()) {
226 trx.close();
227 }
228 } catch (IllegalStateException ise) {
229 ise.printStackTrace();
230 }
231
232
233 for (ISession s : mSessions.values()) {
234 s.close();
235 }
236 }
237
238
239
240
241
242
243
244
245 public void shutDownListener() throws TTException, IOException {
246 for (ExecutorService s : mExecutorMap.values()) {
247 s.shutdown();
248
249 while (!s.isTerminated()) {
250
251 try {
252 Thread.sleep(1000);
253 } catch (InterruptedException e) {
254 LOGGER.error(e.getStackTrace().toString());
255 }
256 }
257 }
258
259 Thread thr = mProcessingThread;
260 if (thr != null) {
261 thr.interrupt();
262 }
263
264 mWatcher.close();
265
266 releaseSessions();
267 }
268
269
270
271
272
273
274
275
276
277
278 private void processFileNotifications() throws InterruptedException,
279 TTException, IOException {
280 while (true) {
281 WatchKey key = mWatcher.take();
282 Path dir = mKeyPaths.get(key);
283 for (WatchEvent<?> evt : key.pollEvents()) {
284 WatchEvent.Kind<?> eventType = evt.kind();
285 if (eventType == OVERFLOW)
286 continue;
287 Object o = evt.context();
288 if (o instanceof Path) {
289 Path path = dir.resolve((Path) evt.context());
290 process(dir, path, eventType);
291 }
292 }
293 key.reset();
294
295 processFsnOnHold();
296 }
297 }
298
299 private void processFsnOnHold() {
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321 }
322
323
324
325
326
327
328
329
330
331
332
333 private void process(Path dir, Path file, WatchEvent.Kind<?> evtType)
334 throws TTException, IOException, InterruptedException {
335
336
337 IFilelistenerWriteTrx trx = null;
338 String rootPath = getListenerRootPath(dir);
339
340 String relativePath = file.toFile().getAbsolutePath();
341 relativePath = relativePath.substring(
342 getListenerRootPath(dir).length(), relativePath.length());
343
344 for (Entry<String, String> e : mFilelistenerToPaths.entrySet()) {
345 if (e.getValue().equals(getListenerRootPath(dir))) {
346 trx = mTrx.get(e.getKey());
347 }
348 }
349
350 if (file.toFile().isDirectory()) {
351 if (evtType == ENTRY_CREATE) {
352 addSubDirectory(dir, file);
353 return;
354 } else if (evtType == ENTRY_DELETE) {
355 for (String s : trx.getFilePaths()) {
356 if (s.contains(relativePath)) {
357 trx.removeFile(s);
358 }
359 }
360 }
361 } else {
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393 ExecutorService s = mExecutorMap.get(getListenerRootPath(dir));
394 if (s != null && !s.isShutdown()) {
395 FilesystemNotification n = new FilesystemNotification(
396 file.toFile(), relativePath, rootPath, evtType, trx);
397 if (mObserver != null) {
398 n.addObserver(mObserver);
399 }
400
401
402
403 s.submit(n);
404
405 }
406 }
407
408 }
409
410
411
412
413
414
415
416
417
418
419
420 private void addSubDirectory(Path root, Path filePath) throws IOException {
421 String listener = getListenerRootPath(root);
422
423 List<String> listeners = mSubDirectories.get(listener);
424
425 if (listeners != null) {
426 if (mSubDirectories.get(listener).contains(
427 filePath.toAbsolutePath())) {
428 return;
429 } else {
430 mSubDirectories.get(listener).add(filePath.toString());
431 }
432
433 try {
434 watchDir(filePath.toFile());
435 } catch (IOException e) {
436 throw new IOException("Could not watch the subdirectories.", e);
437 }
438 }
439 }
440
441
442
443
444
445
446
447
448
449
450 private String getListenerRootPath(Path root) {
451 String listener = "";
452
453 for (String s : mFilelistenerToPaths.values()) {
454 if (root.toString().contains(s)) {
455 listener = s;
456 }
457 }
458
459 return listener;
460 }
461
462
463
464
465
466
467
468
469
470
471
472
473 public static Map<String, String> getFilelisteners()
474 throws FileNotFoundException, IOException, ClassNotFoundException {
475
476 mFilelistenerToPaths = new HashMap<String, String>();
477
478 File listenerFilePaths = new File(StorageManager.ROOT_PATH
479 + File.separator + "mapping.data");
480
481 getFileListenersFromSystem(listenerFilePaths);
482
483 return mFilelistenerToPaths;
484 }
485
486
487
488
489
490
491
492
493
494
495
496 public static boolean addFilelistener(String pResourcename,
497 String pListenerPath) throws FileNotFoundException, IOException,
498 ClassNotFoundException {
499 mFilelistenerToPaths = new HashMap<String, String>();
500
501 File listenerFilePaths = new File(StorageManager.ROOT_PATH
502 + File.separator + "mapping.data");
503
504 getFileListenersFromSystem(listenerFilePaths);
505
506 mFilelistenerToPaths.put(pResourcename, pListenerPath);
507
508 ByteArrayDataOutput output = ByteStreams.newDataOutput();
509 for (Entry<String, String> e : mFilelistenerToPaths.entrySet()) {
510 output.write((e.getKey() + "\n").getBytes());
511 output.write((e.getValue() + "\n").getBytes());
512 }
513
514 java.nio.file.Files.write(listenerFilePaths.toPath(),
515 output.toByteArray(), StandardOpenOption.TRUNCATE_EXISTING);
516
517 return true;
518 }
519
520
521
522
523
524
525
526
527
528
529
530
531
532 public boolean removeFilelistener(String pResourcename) throws IOException,
533 TTException, ResourceNotExistingException {
534 mFilelistenerToPaths = new HashMap<String, String>();
535
536 File listenerFilePaths = new File(StorageManager.ROOT_PATH
537 + File.separator + "mapping.data");
538
539 getFileListenersFromSystem(listenerFilePaths);
540
541 mFilelistenerToPaths.remove(pResourcename);
542
543 StorageManager.removeResource(pResourcename);
544
545 ByteArrayDataOutput output = ByteStreams.newDataOutput();
546 for (Entry<String, String> e : mFilelistenerToPaths.entrySet()) {
547 output.write((e.getKey() + "\n").getBytes());
548 output.write((e.getValue() + "\n").getBytes());
549 }
550
551 java.nio.file.Files.write(listenerFilePaths.toPath(),
552 output.toByteArray(), StandardOpenOption.TRUNCATE_EXISTING);
553
554 return true;
555 }
556
557
558
559
560
561
562
563
564 private static void getFileListenersFromSystem(File pListenerFilePaths)
565 throws IOException {
566 if (!pListenerFilePaths.exists()) {
567 java.nio.file.Files.createFile(pListenerFilePaths.toPath());
568 } else {
569 byte[] bytes = java.nio.file.Files.readAllBytes(pListenerFilePaths
570 .toPath());
571
572 ByteArrayDataInput input = ByteStreams.newDataInput(bytes);
573
574 String key;
575 while ((key = input.readLine()) != null) {
576 String val = input.readLine();
577
578 mFilelistenerToPaths.put(key, val);
579 }
580 }
581 }
582
583
584
585
586
587
588
589 public synchronized IFilelistenerWriteTrx getTrx(String key) {
590 return mTrx.get(key);
591 }
592
593 }