Flume 修改文件名后会重复读取文件问题

描述问题

通过 log4j 等日志框架,将日志打印到文件中,然后该文件会滚动改名,修改文件名后,flume 会重新读取该文件

分析

Flume 通过下面条件,判断是否为新文件:

  • inode:linux文件的唯一id
  • file:文件路径

改名后,虽然 inode 值不会变,也会当成一个新文件,重新读取

解决

下载flume的源码包,打开flume-taildir-source项目文件,不再判断路径

修改 TailFile.java

public boolean updatePos(String path, long inode, long pos) throws IOException {
    //if (this.inode == inode && this.path.equals(path)) { // 注释该行
    if (this.inode == inode) {
        setPos(pos);
        updateFilePos(pos);
        logger.info("Updated position, file: " + path + ", inode: " + inode + ", pos: " + pos);
        return true;
    }
    return false;
}

修改 ReliableTaildirEventReader.java

public List<Long> updateTailFiles(boolean skipToEnd) throws IOException {
        updateTime = System.currentTimeMillis();
        List<Long> updatedInodes = Lists.newArrayList();

        for (TaildirMatcher taildir : taildirCache) {
            Map<String, String> headers = headerTable.row(taildir.getFileGroup());

            for (File f : taildir.getMatchingFiles()) {
                long inode;
                try {
                    inode = getInode(f);
                } catch (NoSuchFileException e) {
                    logger.info("File has been deleted in the meantime: " + e.getMessage());
                    continue;
                }
                TailFile tf = tailFiles.get(inode);
                // 注释掉该行
                //if (tf == null || !tf.getPath().equals(f.getAbsolutePath())) {
                if (tf == null) {
                    long startPos = skipToEnd ? f.length() : 0;
                    tf = openFile(f, headers, inode, startPos);
                } else {
                    boolean updated = tf.getLastUpdated() < f.lastModified() || tf.getPos() != f.length();
                    if (updated) {
                        if (tf.getRaf() == null) {
                            tf = openFile(f, headers, inode, tf.getPos());
                        }
                        if (f.length() < tf.getPos()) {
                            logger.info("Pos " + tf.getPos() + " is larger than file size! "
                                    + "Restarting from pos 0, file: " + tf.getPath() + ", inode: " + inode);
                            tf.updatePos(tf.getPath(), inode, 0);
                        }
                    }
                    tf.setNeedTail(updated);
                }
                tailFiles.put(inode, tf);
                updatedInodes.add(inode);
            }
        }
        return updatedInodes;
    }

编译成jar包,替换掉 flume/lib 目录下的 flume-taildir-source-1.9.0.jar

参考:
https://blog.csdn.net/Bryce_Loski/article/details/106945967


原文出处:https://malaoshi.top/show_1IX4SIB65w4G.html