基于JDK7,监控某文件夹拷贝文件,监控本身很简单用WatcherService实现,主要是如何确定文件拷贝完成,这里使用移动文件的思路
import org.apache.commons.io.FileUtils;import org.joda.time.DateTime;import org.joda.time.format.DateTimeFormat;import org.joda.time.format.DateTimeFormatter;import java.io.File;import java.io.IOException;import java.nio.file.*;import java.util.UUID;import static java.nio.file.StandardWatchEventKinds.*;/** * JDK7 NIO2 文件监控 */public class MyWatcherService { // 时间格式化Format private static final DateTimeFormatter dirNameFormat = DateTimeFormat.forPattern("yyyy-MM-dd HH mm ss "); public static void main(String[] args) throws IOException, InterruptedException { String dir = "C:\\Users\\Administrator\\Desktop\\test"; Path path = Paths.get(dir); WatchService watcher = FileSystems.getDefault().newWatchService(); path.register(watcher, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY); while (true) { /** * 拷贝文件会触发 一次ENTRY_CREATE 两次ENTRY_MODIFY(文件可能只触发一次ENTRY_MODIFY) * 拷贝文件会触发 一次ENTRY_CREATE 两次ENTRY_MODIFY 一次ENTRY_DELETE * 删除文件会触发 一次ENTRY_DELETE */ WatchKey key = watcher.take(); for (WatchEvent event : key.pollEvents()) { WatchEvent.Kind kind = event.kind(); if (kind == OVERFLOW) {//事件可能lost or discarded System.out.println("事件可能lost or discarded"); continue; } WatchEvente = (WatchEvent ) event; String kindName = kind.name(); Path fileName = e.context(); System.out.printf("Event %s has happened,which fileName is %s%n", kindName, fileName); if ("ENTRY_MODIFY".equals(kindName)) { rename(dir, fileName.toString()); } } // 重置 key 如果失败结束监控 if (!key.reset()) { break; } } } /** * 通过修改文件名判断文件拷贝完成 */ public static void rename(String dir, String fileName) { File src = new File(dir + File.separator + fileName); String tmp = "C:\\Users\\Administrator\\Desktop\\test2"; File dest = new File(tmp + File.separator + DateTime.now().toString(dirNameFormat) + UUID.randomUUID().toString().replaceAll("-", "") + File.separator + fileName); try { // 未拷贝完成移动失败 FileUtils.moveFile(src, dest); } catch (Exception e) { FileUtils.deleteQuietly(dest.getParentFile()); } if (dest.exists()) { System.out.printf("file %s copy over , you can do next with %s%n", src.getAbsolutePath(), dest.getAbsolutePath()); } }}
完善版本
使用中发现,如果一次导入大量数据会存在有些event来不及处理就过去的情况,从结果来看就是文件夹中留存一些文件没有处理,进一步的解决方法是采用阻塞队列,文件监控进程只负责把文件名放到队列中
1.主程序启动一个文件监控进程,一个文件处理进程
import java.util.concurrent.BlockingQueue;import java.util.concurrent.LinkedBlockingQueue;/** ** 功能说明:启动两个线程一个监控文件夹改变 一个处理文件 **/public class MyWatcherService2 { public static void main(String[] args) { // 创建单向队列queue DirWatch进程将监控到的文件放到队列中 Processor检查处理队列中的文件 BlockingQueuequeue = new LinkedBlockingQueue (); DirWatch watch = new DirWatch(queue); Thread watchThread = new Thread(watch); watchThread.start(); Processor processor = new Processor(queue); Thread processorThread = new Thread(processor); processorThread.start(); while (true) { try { Thread.sleep(9999999); } catch (InterruptedException e) { e.printStackTrace(); } } }}
2.文件监控进程
import java.nio.file.*;import java.util.concurrent.BlockingQueue;import static java.nio.file.StandardWatchEventKinds.*;/** ** 功能说明:通过NIO 监控文件夹并放入队列中 **/public class DirWatch implements Runnable { private BlockingQueuequeue; public DirWatch(BlockingQueue queue) { this.queue = queue; } @Override public void run() { String sourceDir = "D:\\bigdata\\src"; try { // 对文件夹进行监控 Path path = Paths.get(sourceDir); WatchService watcher = FileSystems.getDefault().newWatchService(); path.register(watcher, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY); while (true) { WatchKey key = watcher.take(); for (WatchEvent event : key.pollEvents()) { WatchEvent.Kind kind = event.kind(); if (kind == OVERFLOW) { System.out.println("事件可能lost or discarded"); continue; } WatchEvent e = (WatchEvent ) event; String kindName = kind.name(); Path fileName = e.context();// System.out.printf("Event %s has happened,which fileName is %s%n", kindName, fileName); if ("ENTRY_MODIFY".equals(kindName)) { // 放到队列中 如果队列满抛出异常 证明程序逻辑出现问题 理论上不可能满 queue.add(fileName.toString()); } } // 重置 key 如果失败结束监控 if (!key.reset()) { throw new RuntimeException("重置Key失败,结束监控");// break; } } } catch (Exception e) { System.out.println("监控异常, 停止程序"); // 无法处理QAR数据 结束程序 System.exit(1); } }}
3.文件处理进程
import org.apache.commons.io.FileUtils;import org.joda.time.DateTime;import org.joda.time.format.DateTimeFormat;import org.joda.time.format.DateTimeFormatter;import java.io.File;import java.util.UUID;import java.util.concurrent.BlockingQueue;/** ** 功能说明:通过移动文件的方法判断文件拷贝完成 并处理文件 **/public class Processor implements Runnable { // 时间格式化Format private DateTimeFormatter dirNameFormat = DateTimeFormat.forPattern("yyyy-MM-dd HH mm ss "); private BlockingQueuequeue; public Processor(BlockingQueue queue) { this.queue = queue; } @Override public void run() { String sourceDir = "D:\\bigdata\\src"; String resDir = "D:\\bigdata\\res"; try { while (true) { String fileName = queue.take(); File src = new File(sourceDir + File.separator + fileName); // 再一次移动防止提交重名文件覆盖正在处理的文件 File dest = new File(resDir + File.separator + DateTime.now().toString(dirNameFormat) + UUID.randomUUID().toString().replaceAll("-", "") + File.separator + fileName); try { // 未拷贝完成不能移动 FileUtils.moveFile(src, dest); } catch (Exception e) { // 删除建立的空目录 FileUtils.deleteQuietly(dest.getParentFile()); } if (dest.exists()) { try { System.out.printf("file %s copy over , you can do next with %s%n", src.getAbsolutePath(), dest.getAbsolutePath()); } catch (Exception e) { e.printStackTrace(); } } } } catch (Exception e) { System.out.println("处理异常, 停止程序"); // 无法处理QAR数据 结束程序 System.exit(1); } }}