flume flume日志采集java 支持哪些日志

拒绝访问 |
| 百度云加速
请打开cookies.
此网站 () 的管理员禁止了您的访问。原因是您的访问包含了非浏览器特征(3a6d175b726e436a-ua98).
重新安装浏览器,或使用别的浏览器2317人阅读
Flume是Cloudera提供的一个高可用的,高可靠的,分布式的海量日志采集、聚合和传输的系统,Flume支持在日志系统中定制各类数据发送方,用于收集数据;同时,Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力。
Flume最早是Cloudera提供的日志收集系统,目前是Apache下的一个孵化项目,Flume支持在日志系统中定制各类数据发送方,用于收集数据。
Flume提供对数据进行简单处理,并写到各种数据接受方(可定制)的能力 Flume提供了从console(控制台)、RPC(Thrift-RPC)、text(文件)、tail(UNIX tail)、syslog(syslog日志系统,支持TCP和UDP等2种模式),exec(命令执行)等数据源上收集数据的能力。
Flume采用了多Master的方式。为了保证配置数据的一致性,Flume[1] 引入了ZooKeeper,用于保存配置数据,ZooKeeper本身可保证配置数据的一致性和高可用,另外,在配置数据发生变化时,ZooKeeper可以通知Flume Master节点。Flume Master间使用gossip协议同步数据。
Flume的结构主要分为三部分:source、channel以及sink.其中source为源头,负责采集日志;channel为通道,负责传输和暂时储存;sink为目的地,将采集到的日志保存起来。在真正日志采集的过程中,根据待采集日志的类型以及存储需求,选择相应的类型的source、channel和sink进行配置,从而将日志采集并且保存起来。
Flume采集日志方案
操作系统:linux
日志更新类型:产生新日志,原日志结尾处追加
采集时间需求
采集周期:短周期(一天之内)
使用flume采集日志文件的过程较简洁,只需选择恰当的source、channel和sink并且配置起来即可,若有特殊需求也可自己进行二次开发实现个人需求。
具体过程为:按照需求配置一个agent,选取适当的source和sink,然后启动该agent,开始采集日志。
flume提供多种source供用户进行选择,尽可能多的满足大部分日志采集的需求,常用的source的类型包括avro、exec、netcat、spooling-directory和syslog等。具体的使用范围和配置方法详见.
flume中的channel不如source和sink那么重要,但却是不可忽视的组成部分。常用的channel为memory-channel,同时也有其他类型的channel,如JDBC、file-channel、custom-channel等,详情见.
flume的sink也有很多种,常用的包括avro、logger、HDFS、hbase以及file-roll等,除此之外还有其他类型的sink,如thrift、IRC、custom等。具体的使用范围和使用方法详见.
Flume处理日志
Flume不止可以采集日志,还可以对日志进行简单的处理,在source处可以通过interceptor对日志正文处的重要内容进行过滤提取,在channel处可以通过header进行分类,将不同类型的日志投入不同的通道中,在sink处可以通过正则序列化来将正文内容进行进一步的过滤和分类。
Flume Source Interceptors
Flume可以通过interceptor将重要信息提取出来并且加入到header中,常用的interceptor有时间戳、主机名和UUID等,用户也可以根据个人需求编写正则过滤器,将某些特定格式的日志内容过滤出来,以满足特殊需求。
Flume Channel Selectors
Flume可以根据需求将不同的日志传输进不同的channel,具体方式有两种:复制和多路传输。复制就是不对日志进行分组,而是将所有日志都传输到每个通道中,对所有通道不做区别对待;多路传输就是根据指定的header将日志进行分类,根据分类规则将不同的日志投入到不同的channel中,从而将日志进行人为的初步分类。
Flume Sink Processors
Flume在sink处也可以对日志进行处理,常见的sink处理器包括custom、failover、load balancing和default等,和interceptor一样,用户也可以根据特殊需求使用正则过滤处理器,将日志内容过滤出来,但和interceptor不同的是在sink处使用正则序列化过滤出的内容不会加入到header中,从而不会使日志的header显得过于臃肿。
常见的source
avro source
avro可以监听和收集指定端口的日志,使用avro的source需要说明被监听的主机ip和端口号,下面给出一个具体的例子:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141
exec source
exec可以通过指定的操作对日志进行读取,使用exec时需要指定shell命令,对日志进行读取,下面给出一个具体的例子:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.mand = tail -F /var/log/secure
a1.sources.r1.channels = c1
spooling-directory source
spo_dir可以读取文件夹里的日志,使用时指定一个文件夹,可以读取该文件夹中的所有文件,需要注意的是该文件夹中的文件在读取过程中不能修改,同时文件名也不能修改。下面给出一个具体的例子:
agent-1.channels = ch-1
agent-1.sources = src-1
agent-1.sources.src-1.type = spooldir
agent-1.sources.src-1.channels = ch-1
agent-1.sources.src-1.spoolDir = /var/log/apache/flumeSpool
agent-1.sources.src-1.fileHeader = true
syslog source
syslog可以通过syslog协议读取系统日志,分为tcp和udp两种,使用时需指定ip和端口,下面给出一个udp的例子:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = syslogudp
a1.sources.r1.port = 5140
a1.sources.r1.host = localhost
a1.sources.r1.channels = c1
常见的channel
Flume的channel种类并不多,最常用的是memory channel,下面给出例子:
a1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000
常见的sink
logger sink
logger顾名思义,就是将收集到的日志写到flume的log中,是个十分简单但非常实用的sink
avro可以将接受到的日志发送到指定端口,供级联agent的下一跳收集和接受日志,使用时需要指定目的ip和端口:例子如下:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 4545
file roll sink
file_roll可以将一定时间内收集到的日志写到一个指定的文件中,具体过程为用户指定一个文件夹和一个周期,然后启动agent,这时该文件夹会产生一个文件将该周期内收集到的日志全部写进该文件内,直到下一个周期再次产生一个新文件继续写入,以此类推,周而复始。下面给出一个具体的例子:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /var/log/flume
hdfs与file roll有些类似,都是将收集到的日志写入到新创建的文件中保存起来,但区别是file roll的文件存储路径为系统的本地路径,而hdfs的存储路径为分布式的文件系统hdfs的路径,同时hdfs创建新文件的周期可以是时间,也可以是文件的大小,还可以是采集日志的条数。具体实例如下:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
hbase sink
hbase是一种数据库,可以储存日志,使用时需要指定存储日志的表名和列族名,然后agent就可以将收集到的日志逐条插入到数据库中。例子如下:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hbase
a1.sinks.k1.table = foo_table
a1.sinks.k1.columnFamily = bar_cf
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
a1.sinks.k1.channel = c1
&&相关文章推荐
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:5370次
排名:千里之外
(window.slotbydup = window.slotbydup || []).push({
id: '4740887',
container: s,
size: '250,250',
display: 'inlay-fix'君,已阅读到文档的结尾了呢~~
基于flume的日志收集统计系统,flume 日志,flume实时传输系统,日志收集系统,linux日志收集系统,分布式日志收集系统,flume,flume kafka,apache flume,flume ng
扫扫二维码,随身浏览文档
手机或平板扫扫即可继续访问
基于flume的日志收集统计系统
举报该文档为侵权文档。
举报该文档含有违规或不良信息。
反馈该文档无法正常浏览。
举报该文档为重复文档。
推荐理由:
将文档分享至:
分享完整地址
文档地址:
粘贴到BBS或博客
flash地址:
支持嵌入FLASH地址的网站使用
html代码:
&embed src='/DocinViewer--144.swf' width='100%' height='600' type=application/x-shockwave-flash ALLOWFULLSCREEN='true' ALLOWSCRIPTACCESS='always'&&/embed&
450px*300px480px*400px650px*490px
支持嵌入HTML代码的网站使用
您的内容已经提交成功
您所提交的内容需要审核后才能发布,请您等待!
3秒自动关闭窗口flume之Taildir Source支持变化追加文件的日志收集 - 偏执狂才能生存!热忱比专业知识更重要......... - CSDN博客
flume之Taildir Source支持变化追加文件的日志收集
项目实例级相关配置例子:
有过变迁变化,所以以最新代码为准,名字有些冲突.......
官文文档:
Taildir Source
This source is provided as a preview feature. It does not work on Windows.
Watch the specified files, and tail them in nearly real-time once detected new lines appended to the each files. If the new lines are being written, this source will retry reading them in wait for the completion of the write.
This source is reliable and will not miss data even when the tailing files rotate. It periodically writes the last read position of each files on the given position file in JSON format. If Flume is stopped or down for some reason, it can restart tailing from
the position written on the existing position file.
In other use case, this source can also start tailing from the arbitrary position for each files using the given position file. When there is no position file on the specified path, it will start tailing from the first line of each files by default.
Files will be consumed in order of their modification time. File with the oldest modification time will be consumed first.
This source does not rename or delete or do any modifications to the file being tailed. Currently this source does not support tailing binary files. It reads text files line by line.
The component type name, needs to be&TAILDIR.
filegroups
Space-separated list of file groups. Each file group indicates a set of files to be tailed.
filegroups.&filegroupName&
Absolute path of the file group. Regular expression (and not file system patterns) can be used for filename only.
positionFile
~/.flume/taildir_position.json
File in JSON format to record the inode, the absolute path and the last position of each tailing file.
headers.&filegroupName&.&headerKey&
Header value which is the set with header key. Multiple headers can be specified for one file group.
byteOffsetHeader
Whether to add the byte offset of a tailed line to a header called ‘byteoffset’.
Whether to skip the position to EOF in the case of files not written on the position file.
idleTimeout
Time (ms) to close inactive files. If the closed file is appended new lines to, this source will automatically re-open it.
writePosInterval
Interval time (ms) to write the last position of each file on the position file.
Max number of lines to read and send to the channel at a time. Using the default is usually fine.
backoffSleepIncrement
The increment for time delay before reattempting to poll for new data, when the last attempt did not find any new data.
maxBackoffSleep
The max time delay between each reattempt to poll for new data, when the last attempt did not find any new data.
cachePatternMatching
Listing directories and applying the filename regex pattern may be time consuming for directories containing thousands of files. Caching the list of matching files can improve performance. The order in which files are consumed will also be cached. Requires
that the file system keeps track of modification times with at least a 1-second granularity.
fileHeader
Whether to add a header storing the absolute path filename.
fileHeaderKey
Header key to use when appending absolute path filename to event header.
Example for agent named a1:
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = TAILDIR
a1.sources.r1.channels = c1
a1.sources.r1.positionFile = /var/log/flume/taildir_position.json
a1.sources.r1.filegroups = f1 f2
a1.sources.r1.filegroups.f1 = /var/log/test1/example.log
a1.sources.r1.headers.f1.headerKey1 = value1
a1.sources.r1.filegroups.f2 = /var/log/test2/.*log.*
a1.sources.r1.headers.f2.headerKey1 = value2
a1.sources.r1.headers.f2.headerKey2 = value2-2
a1.sources.r1.fileHeader = true
一个配正在跑的置文件例子:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = TAILDIR
a1.sources.r1.channels = c1
a1.sources.r1.positionFile = /home/web_admin/opt/v2_flume-apache170/logfile_stats/x1/taildir_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /home/zl/xsvr/server/xgame_1/logs/act/zl_war.*log.*
a1.sources.r1.headers.f1.headerKey1 = value1
a1.sources.r1.fileHeader = true
# Describe the sink
a1.sinks.k1.type = com.flume.dome.mysink.DBsqlSink
a1.sinks.k1.hostname = jdbc:postgresql://192.168.20.243:5432
#a1.sinks.k1.port = 5432
a1.sinks.k1.databaseName = game_log
a1.sinks.k1.tableName = zl_log_info
a1.sinks.k1.user = game
a1.sinks.k1.password = game123
a1.sinks.k1.serverId = 1
a1.sinks.k1.channel = c1
a1.sinks.k1.josnTo = true
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 5000
a1.channels.c1.transactionCapacity = 5000
flume监控文件
对文件实时监控,如果发现文件有新日志,立刻收集并发送,通过sink进行入库收集。
缺点:如果中断,不会记录文件的当前收集状态,重启后只会收集新的日志,会造成数据丢失。但速度快
配置示例:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
#a1.sources.r1.type = spooldir
#a1.sources.r1.spoolDir = /home/rui/log/flumespool
#a1.sources.r1.fileHeader = true
a1.sources.r1.type = exec
a1.mand = tail -F /home/zl/xsvr/server/xgame_1/logs/act/zl_war.log
#a1.sources.mand = for i in /path/*. do cat $i; done
a1.sources.r1.channels = c1
# Describe the sink
a1.sinks.k1.type = com.flume.dome.mysink.DBsqlSink
a1.sinks.k1.hostname = jdbc:postgresql://192.168.20.243:5432
#a1.sinks.k1.port = 5432
a1.sinks.k1.databaseName = game_log
a1.sinks.k1.tableName = zl_log
a1.sinks.k1.user = game
a1.sinks.k1.password = game123
a1.sinks.k1.serverId = 1
a1.sinks.k1.channel = c1
a1.sinks.k1.josnTo = ture
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
a1.sources.r1.type = exec
a1.mand = tail -F /home/zl/xsvr/server/xgame_1/logs/act/zl_war.log
支持比配多个文件方式
a1.sources.mand = for i in /path/*. do cat $i; done
flume监控目录
对指定目录进行实时监控,如发现目录新增文件,立刻收集并发送
缺点:不能对目录文件进行修改,如果有追加内容的文本文件,不允许,
配置示例:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/rui/log/flumespool
a1.sources.r1.fileHeader = true
# Describe the sink
a1.sinks.k1.type = com.flume.dome.mysink.DBsqlSink
a1.sinks.k1.hostname = jdbc:postgresql://192.168.20.243:5432
#a1.sinks.k1.port = 5432
a1.sinks.k1.databaseName = game_log
a1.sinks.k1.tableName = zl_log
a1.sinks.k1.user = game
a1.sinks.k1.password = game123
a1.sinks.k1.serverId = 4
a1.sinks.k1.channel = c1
a1.sinks.k1.josnTo = true
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100
重点在于 :
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/rui/log/flumespool
flume监控目录,支持文件修改,并记录文件状态
TAILDIR &flume 1.7目前最新版新增类型,支持目录变化的文件,如遇中断,并以json数据记录目录下的每个文件的收集状态,
目前我们收集日志方式,已升级为TAILDIR &
配置示例:
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = TAILDIR
a1.sources.r1.channels = c1
a1.sources.r1.positionFile = /home/web_admin/opt/v2_flume-apache170/logfile_stats/x1/taildir_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /home/zl/xsvr/server/xgame_1/logs/act/zl_war.log
a1.sources.r1.headers.f1.headerKey1 = value1
a1.sources.r1.fileHeader = true
# Describe the sink
a1.sinks.k1.type = com.flume.dome.mysink.DBsqlSink
a1.sinks.k1.hostname = jdbc:postgresql://192.168.20.243:5432
#a1.sinks.k1.port = 5432
a1.sinks.k1.databaseName = game_log
a1.sinks.k1.tableName = zl_log_info
a1.sinks.k1.user = game
a1.sinks.k1.password = game123
a1.sinks.k1.serverId = 1
a1.sinks.k1.channel = c1
a1.sinks.k1.josnTo = true
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 5000
a1.channels.c1.transactionCapacity = 5000
a1.sources.r1.type = TAILDIR
a1.sources.r1.channels = c1
a1.sources.r1.positionFile = /home/web_admin/opt/v2_flume-apache170/logfile_stats/x1/taildir_position.json
记录收集状态
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /home/zl/xsvr/server/xgame_1/logs/act/zl_war.log
目录地址,支持单个文件或正则匹配如 dir/*
或 dir/.*log.*
a1.sources.r1.headers.f1.headerKey1 = value1
a1.sources.r1.fileHeader = true
flume启动脚本:
nohup ../bin/flume-ng agent --conf ../conf --conf-file ../conf/x1_dir_to_db_flume.conf --name a1 -Dflume.root.logger=INFO,console & x1nohup.out 2&&1 &
写在sh脚本,方便启动维护,以后台进程方式启动,并把日志输出到指定文件中,方便查看日志和调试
级采用 supervisor 来维护所有项目的后台进程
我的热门文章7037人阅读
flume(4)
1.&&Log4j Appender
1.1.& 使用说明
1.1.2.&&Client端Log4j配置文件
(黄色文字为需要配置的内容)
log4j.rootLogger=INFO,A1,R
# ConsoleAppender out
log4j.appender.A1=org.apache.log4j.ConsoleAppender
log4j.appender.A1.layout=org.apache.log4j.PatternLayout
log4j.appender.A1.layout.ConversionPattern=%d{yyyy/MM/ddHH:mm:ss}%-5p%-10C{1}&%m%n
# File out
//日志Appender修改为flume提供的Log4jAppender
log4j.appender.R=org.apache.flume.clients.log4jappender.Log4jAppender
log4j.appender.R.File=${catalina.home}/logs/ultraIDCPServer.log
//日志需要发送到的端口号,该端口要有ARVO类型的source在监听
log4j.appender.R.Port =44444
//日志需要发送到的主机ip,该主机运行着ARVO类型的source
log4j.appender.R.Hostname =localhost
log4j.appender.R.MaxFileSize=102400KB
# log4j.appender.R.MaxBackupIndex=5
log4j.appender.R.layout=org.apache.log4j.PatternLayout
log4j.appender.R.layout.ConversionPattern=%d{yyyy/MM/ddHH\:mm\:ss}%-5p%-10C{1}&%m%n
log4j.appender.R.encoding=UTF-8
.ultrapower.ultracollector.webservice.MessageIntercommunionInterfaceImpl=INFO,webservice
log4j.appender.webservice=org.apache.log4j.FileAppender
log4j.appender.webservice.File=${catalina.home}/logs/logsMsgIntercommunionInterface.log
log4j.appender.webservice.layout=org.apache.log4j.PatternLayout
log4j.appender.webservice.layout.ConversionPattern=%d{yyyy/MM/ddHH\:mm\:ss}%-5p[%t]%l%X-%m%n
log4j.appender.webservice.encoding=UTF-8
注:Log4jAppender继承自AppenderSkeleton,没有日志文件达到特定大小,转换到新的文件的功能
1.1.3.&&flume agent配置
agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1
# Describe/configure source1
agent1.sources.source1.type = avro
agent1.sources.source1.bind = 192.168.0.141
agent1.sources.source1.port = 44444
# Describe sink1
agent1.sinks.sink1.type = FILE_ROLL
agent1.sinks.sink1.sink.directory = /home/yubojie/flume/apache-flume-1.2.0/flume-out
# Use a channel which buffers events in memory
agent1.channels.channel1.type = memory
agent1.channels.channel1.capacity = 1000
agent1.channels.channel1.transactionCapactiy = 100
# Bind the source and sink to the channel
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1
注:生成的文件的规则为每隔固定时间间隔生成一个新的文件,文件里面保存该时间段agent接收到的信息
1.2.& 分析
1.&&&&&&&使用简便,工作量小。
2.&&&&&&&用户应用程序使用log4j作为日志记录jar包,而且项目中使用的jar包要在log4j-1.2.15版本以上,
3.&&&&&&&应用系统必须将flume所需jar包引入到项目中。如下所示为所有必须jar包:可能会存在jar冲突,影响应用运行
4.&&&&&&&能够提供可靠的数据传输,使用flume log4jAppender采集日志可以不在客户机上启动进程,而只通过修改logapppender直接把日志信息发送到采集机(参见图一),此种情况可以保证采集机接受到数据之后的数据可靠性,但是客户机与采集机连接失败时候数据会丢失。改进方案是在客户机上启动一个agent,这样可以保证客户机和采集机不能连通时,当能连通是日志也被采集上来,不会发送数据的丢失(参见图二),为了可靠性,需在客户机上启动进程
&&&&&&&&&&&&&&&
1.3.& 日志代码
(“this message has DEBUG in it”);
1.4.& 采集到的数据样例
this message has DEBUG in it
this message has DEBUG in it
2.&&Exec source(放弃)
The problem with ExecSource and other asynchronous sources is that thesource can not guarantee that if there is a failure to put the event into theChannel the client knows about it. In such cases, the data will be lost.
As afor instance, one of the most commonly requested features is thetail -F [file]-like use casewhere an application writes to a log file on disk and Flume
tails the file,sending each line as an event. While this is possible, there’ what happens if the channel fills up and Flume can’t send an event?Flume has no way of indicating to the application writing the log file that itneeds to retain
the log or that the event hasn’t been sent, for some reason. Ifthis doesn’t make sense, you need only know this: Your application can neverguarantee data has been received when using a unidirectional asynchronousinterface such as ExecSource! As an extension
of this warning - and to becompletely clear - there is absolutely zero guarantee of event delivery whenusing this source. You have been warned.
注:即使是agent内部的可靠性都不能保证
2.1.& 使用说明
2.1.1.&&flume agent配置
# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per agent,
# in this case called 'agent'
# example.conf: A single-node Flume configuration
# Name the components on this agent
agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1
# Describe/configure source1
#agent1.sources.source1.type = avro
agent1.sources.source1.type = exec
agent1.mand = tail -f /home/yubojie/logs/ultraIDCPServer.log
#agent1.sources.source1.bind = 192.168.0.146
#agent1.sources.source1.port = 44444
agent1.sources.source1.interceptors = a
agent1.sources.source1.interceptors.a.type = org.apache.flume.interceptor.HostInterceptor$Builder
agent1.sources.source1.interceptors.a.preserveExisting = false
agent1.sources.source1.interceptors.a.hostHeader = hostname
# Describe sink1
#agent1.sinks.sink1.type = FILE_ROLL
#agent1.sinks.sink1.sink.directory = /home/yubojie/flume/apache-flume-1.2.0/flume-out
agent1.sinks.sink1.type = hdfs
agent1.sinks.sink1.hdfs.path = hdfs://localhost:9000/user/
agent1.sinks.sink1.hdfs.fileType = DataStream
# Use a channel which buffers events in memory
agent1.channels.channel1.type = memory
agent1.channels.channel1.capacity = 1000
agent1.channels.channel1.transactionCapactiy = 100
# Bind the source and sink to the channel
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1
2.2.& 分析
1.&&&&&&&&&tail方式采集日志需要宿主主机能够执行tail命令,应该是只有linux系统可以执行,不支持window系统日志采集
2.&&&&&&&&&EXEC采用异步方式采集,会发生日志丢失,即使在节点内的数据也不能保证数据的完整
3.&&&&&&&&&tail方式采集需要宿主操作系统支持tail命令,即原始的windows操作系统不支持tail命令采集
2.3.& 采集到的数据样例
02:36:34 INFO& LogTest&&&& this message has DEBUG 中文 in it
02:40:12 INFO& LogTest&&&& this message has DEBUG 中文 in it
2.4.& 日志代码
(“this message has DEBUG 中文 in it”);
3.&&Syslog
Passing messages using syslogprotocol doesn't work well for longer messages. &The syslog appender forLog4j is hardcoded to linewrap around 1024 characters in order to comply withthe RFC. &I got a sample program logging to syslog, picking it up with
asyslogUdp source, with a JSON layout (to avoid new-lines in stack traces) onlyto find that anything but the smallest stack trace line-wrapped anyway. &Ican't see a way to reliably reconstruct the stack trace once it is wrapped andsent through the flume chain.(注:内容不确定是否1.2版本)
&& Syslog TCP需要指定eventsize,默认为2500
&& Syslog UDP为不可靠传输,数据传输过程中可能出现丢失数据的情况。
3.1.& 使用说明
3.1.1.&&Client端示例代码
import&java.io.IOE
importjava.io.OutputStream;
import&java.net.S
import&java.net.UnknownHostE
publicclass&SyslogTcp {
&publicstaticvoid&main(String
&&&&&Socket client =&null;
&&&&&OutputStream&out =null;
&&&&&&&client =&new&Socket(&127.0.0.1&,
&&&&&&&out= client.getOutputStream();&
&&&&&&&&String event =&&&4&hello\n&;&
&&&&&&&&out.write(event.getBytes());&
&&&&&&&&out.flush();&
&&&&&&&&System.out.println(&发送成功&&);
&&&&}&catch&(UnknownHostException e) {
&&&&&&&//TODO&Auto-generated catch block
&&&&&&&e.printStackTrace();
&&&&}&catch&(IOException e) {
&&&&&&&//TODO&Auto-generated catch block
&&&&&&&e.printStackTrace();
&&&&}&finally{
&&&&&&&try&{
&&&&&&&&&&&out.close();
&&&&&&&}&catch&(IOException e) {
&&&&&&&&&&&//TODO&Auto-generated catch block
&&&&&&&&&&&e.printStackTrace();
&&&&&&&&try&{
&&&&&&&&&&&client.close();
&&&&&&&}&catch&(IOException e) {
&&&&&&&&&&&//TODO&Auto-generated catch block
&&&&&&&&&&&e.printStackTrace();
3.1.2.&&日志接收的flume agent配置
agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1
# Describe/configure source1
agent1.sources.source1.type =&syslogtcp
agent1.sources.source1.bind = 127.0.0.1
agent1.sources.source1.port = 5140
# Describe sink1
#agent1.sinks.sink1.type =&avro
#agent1.sinks.sink1.channels = channel1
#agent1.sinks.sink1.hostname&= 192.168.0.144
#agent1.sinks.sink1.port = 44444
agent1.sinks.sink1.type = FILE_ROLL
agent1.sinks.sink1.sink.directory = E:\\file-out
# Use a channel which buffers events in memory
agent1.channels.channel1.type = memory
agent1.channels.channel1.capacity = 1000
agent1.channels.channel1.transactionCapactiy = 100
# Bind the source and sink to the channel
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1
3.2.&&&&&分析
需要编写Client采集代码,增量采集日志信息通过socket发送到flume agent;对于长数据处理不是很理想。可靠性可以参考log4j appender的方式来保证。
4.&&日志过滤Interceptor(FLUME-1358)
Flume支持依据正则表达式过滤event,但是在1.2.0的源代码中没有发现具体实现的代码,根据FLUME-1358的说明信息,可以将RegexFilteringInterceptor类加入到代码中使用。
需要的操作为:
添加类RegexFilteringInterceptor
修改InterceptorType,添加type与类的映射关系:
REGEX_FILTER(org.apache.flume.interceptor.RegexFilteringInterceptor.Builder.class)
4.1.& Regex FilteringInterceptor说明
This interceptor filters events selectively by interpreting the eventbody as text and matching the text against a configured regular expression. Thesupplied regular expression can be used to include events or exclude events.
Property Name
Description
The component type name has to be&REGEX_FILTER
Regular expression for matching against events
excludeRegex
If true, regex determines events to exclude, otherwise regex determines events to include.
4.2.& 使用说明(测试配置)
4.2.1.&&日志接收的Flume agent配置
agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1
# Describe/configure source1
agent1.sources.source1.type =&avro
agent1.sources.source1.bind =&localhost
agent1.sources.source1.port = 5140
agent1.sources.source1.interceptors&= inter1
agent1.sources.source1.interceptors.inter1.type = REGEX_FILTER
agent1.sources.source1.interceptors.inter1.regex&= .*DEBUG.*
agent1.sources.source1.interceptors.inter1.excludeRegex = false
# Describe sink1
#agent1.sinks.sink1.type =&avro
#agent1.sinks.sink1.channels = channel1
#agent1.sinks.sink1.hostname&= 192.168.0.144
#agent1.sinks.sink1.port = 44444
agent1.sinks.sink1.type = FILE_ROLL
agent1.sinks.sink1.sink.directory = E:\\file-out
# Use a channel which buffers events in memory
agent1.channels.channel1.type = memory
agent1.channels.channel1.capacity = 1000
agent1.channels.channel1.transactionCapactiy = 100
# Bind the source and sink to the channel
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1
5.&&HDFS SINK
5.1.& 使用说明
输出到hdfs的数据,首先在hdfs上创建文件.tmp,然后文件关闭时,将tmp后缀去掉,存储方案与file输出类似,可以设定时间间隔、文件大小、接受事件条数作为滚动生成新文件的依据,默认30s
5.2.& 可配置项
Description
The component type name, needs to be&hdfs
HDFS directory path (eg hdfs://namenode/flume/webdata/)
hdfs.filePrefix
Name prefixed to files created by Flume in hdfs directory
hdfs.rollInterval
Number of seconds to wait before rolling current file (0 = never roll based on time interval)
hdfs.rollSize
File size to trigger roll, in bytes (0: never roll based on file size)
hdfs.rollCount
Number of events written to file before it rolled (0 = never roll based on number of events)
hdfs.batchSize
number of events written to file before it flushed to HDFS
hdfs.txnEventMax
hdfs.codeC
Compression codec. one of following : gzip, bzip2, lzo, snappy
hdfs.fileType
SequenceFile
File format: currently&SequenceFile,DataStream&orCompressedStream(1)DataStream
will not compress output file and please don’t set codeC (2)CompressedStream requires set hdfs.codeC with an available codeC
hdfs.maxOpenFiles
hdfs.writeFormat
“Text” or “Writable”
hdfs.appendTimeout
hdfs.callTimeout
hdfs.threadsPoolSize
Number of threads per HDFS sink for HDFS IO ops (open, write, etc.)
hdfs.rollTimerPoolSize
Number of threads per HDFS sink for scheduling timed file rolling
hdfs.kerberosPrincipal
Kerberos user principal for accessing secure HDFS
hdfs.kerberosKeytab
Kerberos keytab for accessing secure HDFS
hdfs.round
Should the timestamp be rounded down (if true, affects all time based escape sequences except %t)
hdfs.roundValue
Rounded down to the highest multiple of this (in the unit configured usinghdfs.roundUnit), less than current time.
hdfs.roundUnit
The unit of the round down value -&second,minute&orhour.
serializer
Other possible options include&AVRO_EVENT&or the fully-qualified class name of an implementation
of theEventSerializer.Builder&interface.
serializer.*
5.3.& Agent配置样例
# The configuration file needs to define the sources,
# the channels and the sinks.
# Sources, channels and sinks are defined per agent,
# in this case called 'agent'
# example.conf: A single-node Flume configuration
# Name the components on this agent
agent1.sources = source1
agent1.sinks = sink1
agent1.channels = channel1
# Describe/configure source1
#agent1.sources.source1.type = avro
agent1.sources.source1.type = exec
agent1.mand = tail -f /home/yubojie/logs/ultraIDCPServer.log
#agent1.sources.source1.bind = 192.168.0.146
#agent1.sources.source1.port = 44444
agent1.sources.source1.interceptors = a
agent1.sources.source1.interceptors.a.type = org.apache.flume.interceptor.HostInterceptor$Builder
agent1.sources.source1.interceptors.a.preserveExisting = false
agent1.sources.source1.interceptors.a.hostHeader = hostname
# Describe sink1
#agent1.sinks.sink1.type = FILE_ROLL
#agent1.sinks.sink1.sink.directory = /home/yubojie/flume/apache-flume-1.2.0/flume-out
agent1.sinks.sink1.type = hdfs
agent1.sinks.sink1.hdfs.path = hdfs://192.168.98.20:9000/user/hadoop/yubojietest
agent1.sinks.sink1.hdfs.fileType = DataStream
# Use a channel which buffers events in memory
agent1.channels.channel1.type = memory
agent1.channels.channel1.capacity = 1000
agent1.channels.channel1.transactionCapactiy = 100
# Bind the source and sink to the channel
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1
6.&&多agent采集文件到hdfs
6.1.& 准备工作
1.&&&&&&&&&文件采集类打包成jar放到flume/apache-flume-1.2.0/lib目录下
2.&&&&&&&&&创建fileSourceRecorder.properties空文件放到flume/apache-flume-1.2.0/conf下(将要修改为如果文件不存在则创建该文件,后续将不用再创建这个文件)
6.2.& agent配置文件
6.2.1.&&agent1
# example.conf: A single-node Flume configuration&&&&&
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&
# Name the components on this agent&&&&&&&&&&&&&&&&
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&& &&&&
agent1.sources = source1&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&
agent1.sinks = sink1&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&
agent1.channels = channel1&&&&&&&&&&&&&&&&&&&&&&&&
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&
# Describe/configure source1&&&&&&&&&&&&&&&&&&&&&&&
agent1.sources.source1.type = com.ultrapower.ultracollector.flume.source.file.FileSource&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&
agent1.sources.source1.path = /home/yubojie/logs/ultraIDCPServer.log
#gbk,utf-8&
agent1.sources.source1.encoding = utf-8
agent1.sources.source1.onceMaxReadByte = 999
agent1.sources.source1.cacheQueueSize = 10
agent1.sources.source1.noChangeSleepTime = 1000
agent1.sources.source1.batchCommitSize = 5
agent1.sources.source1.batchWaitTime = 500&&&&&&&&
#agent1.sources.source1.type = avro
#agent1.sources.source1.bind = localhost
#agent1.sources.source1.port = 44444
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&& &&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&
# Describe sink1&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&
#agent1.sinks.sink1.type = logger&&&&&&&&&&
#agent1.sinks.sink1.type = FILE_ROLL
#agent1.sinks.sink1.sink.directory = E:/file-out
#agent1.sinks.sink1.sink.fileName = a.log
agent1.sinks.sink1.type = hdfs
#agent1.sinks.sink1.hdfs.path = hdfs://192.168.98.20:9000/user/hadoop/yubojietest
agent1.sinks.sink1.hdfs.path = hdfs://192.168.0.153:9000/user/file
agent1.sinks.sink1.hdfs.callTimeout = 20000
agent1.sinks.sink1.hdfs.fileType = DataStream
#agent1.sinks.sink1.sink.rollInterval = 30&&&&&&&&
&&&&&&&&&&&&&&&
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&
# Use a channel which buffers events in memory&&&&&&&&
agent1.channels.channel1.type = memory&&&&&&&&&&&&&
agent1.channels.channel1.capacity = 1000&&&&&&&&&&&
agent1.channels.channel1.transactionCapactiy = 100&
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&
# Bind the source and sink to the channel&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&
agent1.sources.source1.channels = channel1
agent1.sinks.sink1.channel = channel1&&&&&&&
########################## test method ########################################
#########start flume agent #########&
#agent -n agent1 -f .\conf\flume-conf.properties.template.file.signle
######### client send message #########
# $ bin/flume-ng avro-client -H localhost -p 44444 -F 'F:/1/log.log'
6.2.2.&&agent2
# example.conf: A single-node Flume configuration&&&&&
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&
# Name the components on this agent&&&&&&&&&& &&&&&&
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&
agent2.sources = source1&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&
agent2.sinks = sink1&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&
agent2.channels = channel1&&&&&&&&&&&&&&&&&&&&&&&&
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&
# Describe/configure source1&&&&&&&&&&&&&&&&&&&&&&&
agent2.sources.source1.type = com.ultrapower.ultracollector.flume.source.file.FileSource&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&
agent2.sources.source1.path = /home/yubojie/logtest/logs/ultraIDCPServer.log
#gbk,utf-8&
agent2.sources.source1.encoding = utf-8
agent2.sources.source1.onceMaxReadByte = 999
agent2.sources.source1.cacheQueueSize = 10
agent2.sources.source1.noChangeSleepTime = 1000
agent2.sources.source1.batchCommitSize = 5
agent2.sources.source1.batchWaitTime = 500&&&&&&&&
#agent1.sources.source1.type = avro
#agent1.sources.source1.bind = localhost
#agent1.sources.source1.port = 44444
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&
# Describe sink1&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&& &&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&
#agent1.sinks.sink1.type = logger&&&&&&&&&&
#agent1.sinks.sink1.type = FILE_ROLL
#agent1.sinks.sink1.sink.directory = E:/file-out
#agent1.sinks.sink1.sink.fileName = a.log
agent2.sinks.sink1.type = hdfs
#agent1.sinks.sink1.hdfs.path = hdfs://192.168.98.20:9000/user/hadoop/yubojietest
agent2.sinks.sink1.hdfs.path = hdfs://192.168.0.153:9000/user/file
agent2.sinks.sink1.hdfs.callTimeout = 20000
agent2.sinks.sink1.hdfs.fileType = DataStream
#agent1.sinks.sink1.sink.rollInterval = 30&&&&&&&&
&&&&&&&&&&&&&&&
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&
# Use a channel which buffers events in memory&&&&&&&&
agent2.channels.channel1.type = memory&&&&&&&&&&&&&
agent2.channels.channel1.capacity = 1000&&&&&&&&&&&
agent2.channels.channel1.transactionCapactiy = 100&
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&
# Bind the source and sink to the channel&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&& &&&&&&&&&&&&&&&&
agent2.sources.source1.channels = channel1
agent2.sinks.sink1.channel = channel1&&&&&&&
########################## test method ########################################
#########start flume agent #########&
#agent -n agent1 -f .\conf\flume-conf.properties.template.file.signle
######### client send message #########
# $ bin/flume-ng avro-client -H localhost -p 44444 -F 'F:/1/log.log'
6.3.& 启动命令
flume-ng agent -name agent1 -c conf -f ../conf/flume-conf.properties&&
//agent1监控/home/yubojie/logs/ultraIDCPServer.log
flume-ng agent -name agent2 -c conf -f ../conf/flume-conf2.properties
//agent2监控/home/yubojie/logtest/logs/ultraIDCPServer.log
6.4.& 测试结果
1.&&&&&&&&&agent1和agent2各自监控相应文件,互不干涉
2.&&&&&&&&&文件各自输出到hdfs生成各自的文件
6.&&参考资料:
RegexFilteringInterceptor源代码
packageorg.apache.flume.
importstaticorg.apache.flume.interceptor.RegexFilteringInterceptor.Constants.DEFAULT_EXCLUDE_EVENTS;
importstatic&org.apache.flume.interceptor.RegexFilteringInterceptor.Constants.DEFAULT_REGEX;
importstatic&org.apache.flume.interceptor.RegexFilteringInterceptor.Constants.EXCLUDE_EVENTS;
importstatic&org.apache.flume.interceptor.RegexFilteringInterceptor.Constants.REGEX;
import&java.util.L
import&java.util.regex.P
import&org.apache.flume.C
import&org.apache.flume.E
import&org.slf4j.L
importorg.slf4j.LoggerFactory;
import&mon.collect.L
publicclass&RegexFilteringInterceptorimplements&Interceptor
&&privatestaticfinal&Loggerlogger&=LoggerFactory
&&&&&&.getLogger(RegexFilteringInterceptor.class);
&&privatefinal&Patternregex;
&&privatefinalbooleanexcludeEvents;
&&&*Only{@link RegexFilteringInterceptor.Builder}canbuildme
&&private&RegexFilteringInterceptor(Pattern regex,boolean&excludeEvents)
&&&&this.regex&=
&&&&this.excludeEvents&=
&&@Override
&&publicvoid&initialize() {
&&&&// no-op
&&@Override
&&&*Returnstheeventifitpassestheregularexpressionfilterandnull
&&&*otherwise.
&&public&Event intercept(Event event) {
&&&&// We've already ensured here that at most one of includeRegex and
&&&&// excludeRegex are defined.
&&&&if&(!excludeEvents) {
&&&&&&if&(regex.matcher(new&String(event.getBody())).find())
&&&&&&&&return&
&&&&&&else&{
&&&&&&&&returnnull;
&&&&else&{
&&&&&&if&(regex.matcher(new&String(event.getBody())).find())
&&&&&&&&returnnull;
&&&&&&else&{
&&&&&&&&return&
&&&*Returnsthesetofeventswhichpassfilters,accordingto
&&&*{@link #intercept(Event)}.
&&&*@paramevents
&&&*@return
&&@Override
&&public&List&Event& intercept(List&Event& events) {
&&&&List&Event& out = Lists.newArrayList();
&&&&for&(Event event : events) {
&&&&&&Event outEvent = intercept(event);
&&&&&&if&(outEvent !=null)
{ out.add(outEvent); }
&&&&return&
&&@Override
&&publicvoid&close() {
&&&&// no-op
&&&*BuilderwhichbuildsnewinstanceoftheStaticInterceptor.
&&publicstaticclass&Builderimplements&Interceptor.Builder
&&&&private&Patternregex;
&&&&privatebooleanexcludeEvents;
&&&&@Override
&&&&publicvoid&configure(Context context) {
&&&&&&String regexString = context.getString(REGEX,DEFAULT_REGEX);
&&&&&&regex&= Pattern.compile(regexString);
&&&&&&excludeEvents&= context.getBoolean(EXCLUDE_EVENTS,
&&&&&&&&&&DEFAULT_EXCLUDE_EVENTS);
&&&&@Override
&&&&public&Interceptor build() {
&&&&&&logger.info(String.format(
&&&&&&&&&&&Creating RegexFilteringInterceptor: regex=%s,excludeEvents=%s&,
&&&&&&&&&&regex,excludeEvents));
&&&&&&returnnew&RegexFilteringInterceptor(regex,excludeEvents);
&&publicstaticclass&Constants
&&&&publicstaticfinal&StringREGEX&=&regex&;
&&&&publicstaticfinal&StringDEFAULT_REGEX&=&.*&;
&&&&publicstaticfinal&StringEXCLUDE_EVENTS&=&excludeEvents&;
&&&&publicstaticfinalbooleanDEFAULT_EXCLUDE_EVENTS&=&false;
InterceptorType源代码
黄色为添加内容
package org.apache.flume.
public enum InterceptorType {
& TIMESTAMP(org.apache.flume.interceptor.TimestampInterceptor.Builder.class),
& HOST(org.apache.flume.interceptor.HostInterceptor.Builder.class),
&&REGEX_FILTER(org.apache.flume.interceptor.RegexFilteringInterceptor.Builder.class),
& private final Class&? extends Interceptor.Builder& builderC
& private InterceptorType(Class&? extends Interceptor.Builder& builderClass) {
&&& this.builderClass = builderC
& public Class&? extends Interceptor.Builder& getBuilderClass() {
&&& return builderC
&&相关文章推荐
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:112669次
积分:1959
积分:1959
排名:千里之外
原创:81篇
评论:24条
(2)(2)(1)(13)(3)(4)(1)(3)(4)(13)(2)(17)(23)
(window.slotbydup = window.slotbydup || []).push({
id: '4740881',
container: s,
size: '200,200',
display: 'inlay-fix'

我要回帖

更多关于 flume日志采集java 的文章

 

随机推荐