版上有工作中使用flume 如何使用的吗

502 Bad Gateway
502 Bad GatewayFlume研究心得
绑定的课程:
最近两天,仔细的看了一下Flume中央日志系统(版本号:1.3.X),Flume在本人看来,还是一个非常不错的日志收集系统的,其设计理念非常易用,简洁。并且是一个开源项目,基于Java语言开发,可以进行一些自定义的功能开发。运行Flume时,机器必须安装装JDK6.0以上的版本,并且,Flume目前只有Linux系统的启动脚本,没有Windows环境的启动脚本。
Flume主要由3个重要的组件购成:
Source:完成对日志数据的收集,分成transtion 和 event 打入到channel之中。 &&&&& Channel:主要提供一个队列的功能,对source提供中的数据进行简单的缓存。 &&&&& Sink:取出Channel中的数据,进行相应的存储文件系统,数据库,或者提交到远程服务器。
对现有程序改动最小的使用方式是使用是直接读取程序原来记录的日志文件,基本可以实现无缝接入,不需要对现有程序进行任何改动。
对于直接读取文件Source,有两种方式:
ExecSource:以运行Linux命令的方式,持续的输出最新的数据,如tail -F 文件名指令,在这种方式下,取的文件名必须是指定的。 &&&&& SpoolSource:是监测配置的目录下新增的文件,并将文件中的数据读取出来。需要注意两点:1、拷贝到spool目录下的文件不可以再打开编辑。2、spool目录下不可包含相应的子目录。在实际使用的过程中,可以结合log4j使用,使用log4j的时候,将log4j的文件分割机制设为1分钟一次,将文件拷贝到spool的监控目录。log4j有一个TimeRolling的插件,可以把log4j分割的文件到spool目录。基本实现了实时的监控。Flume在传完文件之后,将会修改文件的后缀,变为.COMPLETED(后缀也可以在配置文件中灵活指定)
ExecSource,SpoolSource对比:ExecSource可以实现对日志的实时收集,但是存在Flume不运行或者指令执行出错时,将无法收集到日志数据,无法何证日志数据的完整性。SpoolSource虽然无法实现实时的收集数据,但是可以使用以分钟的方式分割文件,趋近于实时。如果应用无法实现以分钟切割日志文件的话,可以两种收集方式结合使用。
Channel有多种方式:有MemoryChannel,JDBC Channel,MemoryRecoverChannel,FileChannel。MemoryChannel可以实现高速的吞吐,但是无法保证数据的完整性。MemoryRecoverChannel在官方文档的建议上已经建义使用FileChannel来替换。FileChannel保证数据的完整性与一致性。在具体配置不现的FileChannel时,建议FileChannel设置的目录和程序日志文件保存的目录设成不同的磁盘,以便提高效率。
Sink在设置存储数据时,可以向文件系统中,数据库中,hadoop中储数据,在日志数据较少时,可以将数据存储在文件系中,并且设定一定的时间间隔保存数据。在日志数据较多时,可以将相应的日志数据存储到Hadoop中,便于日后进行相应的数据分析。
已学习课程数:4
已发表笔记数:117
Java线程池使用说明一简介线程的使用在java中占有极其重要的地位,在jdk1.4极其之前的jdk版本中,关于线程池的使用是极其简陋的。在jdk1.5之后这一情况有了很大的改观。Jdk1.5之后加入了java.util.concurrent包,这个包中主要介绍java中线程以及线程池的使用。为我们在开发中处理线程的问题提供了非常大的帮助。二:线程池线程池的作用:线程池作用就是限制系统中执行线程的数量。 根据系统的环境情况,可以自动或手动设置线程数量,达到运行的最佳效果;少了浪费了系统资源,多了造成系统拥
  本篇文章通过一个实际工作中遇到的例子开始吧:  工程使用Spring+Mybatis+Mysql开发。具体的业务逻辑很重,对象之间一层一层的嵌套。和数据库表对应的是大量的model类,而和前端交互的是Vo类。现在需要做一个需求,有两种方式来实现:  使用现有的Service接口,或者自己在编写一些用到的接口,手动使用Java代码来分别调用Service接口来查出各个model,然后在业务层将model转换为vo,最后返回给前端json串。&&&&& 为
转自:http://miaoxiaodong78./blog/static//& Decorator设计模式是典型的结构型模式(在GOF的那本模式的Bible中将模式分为:1.创建型模式;2.结构型模式;3.行为模式三种)。它的主要用意是:动态地为对象添加一些额外的功能。(记住上面两种颜色的词汇,理解装饰器模式的精髓所在!)下面是GOF的《Element of reusable Object-Oriented Software
序列化:将java对象转换为字节序列的过程叫做序列化反序列化:将字节对象转换为java对象的过程叫做反序列化通常情况下,序列化有两种用途:、1) 把对象的字节序列永久的保存在硬盘中2)在网络上传输对象的字节序列相应的API  java.io.ObjectOutputStream          writeObject(Object obj)  java.io.ObjectInputStream          readObject()只有实现了Serializable或者Externalizable接
代理模式的作用是:为其他对象提供一种代理以控制对这个对象的访问。在某些情况下,一个客户不想或者不能直接引用另一个对象,而代理对象可以在客户端和目标对象之间起到中介的作用。
代理模式一般涉及到的角色有:
  抽象角色:声明真实对象和代理对象的共同接口;
  代理角色:代理对象角色内部含有对真实对象的引用,从而可以操作真实对象,同时代理对象提供与真实对象相同的接口以便在任何时刻都能代替真实对象。同时,代理对象可以在执行真实对象操作时,附加其他的操作,相当于对真实对象进行封装。
  真实角色:代
首先什么是注解?  最常见的是,在我们使用Eclipse等工具编写java代码的时候,有时候会出现一些比如@Deprecated,@Override,@SuppressWarnings等东东。这个就是常见的几种注解。   在开发Java程序,尤其是Java EE应用的时候,总是免不了与各种配置文件打交道。以Java EE中典型的S(pring)S(truts)H(ibernate)架构来说,Spring、Struts和Hibernate这三个框架都有自己的XML格式的配置文件。这些配置文件需要与Java源
以前没有写笔记的习惯,现在慢慢的发现及时总结是多么的重要了,呵呵。虽然才大二,但是也快要毕业了,要加油了。这一篇文章主要关于java多线程,主要还是以例子来驱动的。因为讲解多线程的书籍和文章已经很多了,所以我也不好意思多说,呵呵、大家可以去参考一些那些书籍。我这个文章主要关于实际的一些问题。同时也算是我以后复习的资料吧,。呵呵大家多多指教。同时希望多结交一些技术上的朋友。谢谢。-----------------------------------------------------------------
在一个类编译完成之后,下一步就需要开始使用类,如果要使用一个类,肯定离不开JVM。在程序执行中JVM通过装载,链接,初始化这3个步骤完成。类的装载是通过类加载器完成的,加载器将.class文件的二进制文件装入JVM的方法区,并且在堆区创建描述这个类的java.lang.Class对象。用来封装数据。但是同一个类只会被类装载器装载以前链接就是把二进制数据组装为可以运行的状态。链接分为校验,准备,解析这3个阶段校验一般用来确认此二进制文件是否适合当前的JVM(版本),准备就是为静态成员分配内存空间,。并设置默
本篇文章依旧采用小例子来说明,因为我始终觉的,案例驱动是最好的,要不然只看理论的话,看了也不懂,不过建议大家在看完文章之后,在回过头去看看理论,会有更好的理解。下面开始正文。【案例1】通过一个对象获得完整的包名和类名?package R/*** 通过一个对象获得完整的包名和类名* */class Demo{//other codes...}class hello{public static void main(String[] args) {Demo
前提Hadoop版本:hadoop-0.20.2概述在上一篇文章中HDFS源码分析(9):DFSClient初步介绍了HDFS客户端的相关内容,但由于篇幅关系,没有对HDFS的输入/输出流进行介绍,而这正是本文的重点。数据的读取和写入是客户端最重要的功能,也是最主要的逻辑,本文将分成输入和输出两部分对HDFS的文件流进行分析。主要的类位于org.apache.hadoop.hdfs.DFSClient类中。DFSInputStreamDFSInputStream的主要功能是向namenode获取块信息,并
Flume是一个分布式的,质量可靠,可有效地收集,汇总和来自许多不同来源的大量日志数据到集中的数据存储系统。Flume NG 1.x 是Flume OG 0.9.x的重构版本,俨然从一个分布式系统变成了传输工具。新的架构如下:&&1.1&&&&Flume特点Flume的特点是可以通过手工配置,可以自动收集日志文件,在大数据处理及各种复杂的情况下,flume经常被用来作为数据处理的工具,Flume搜集日志的方式多种多样,比如可以检测文件夹的变化,可
业务系统需要收集监控系统日志,想到了hadoop的flume。经过试验,虽说功能不算足够强大,但基本上能够满足功能需求。Flume 是一个分布式、可靠和高可用的服务日志收集工具,能够和hadoop,hive等配置完成日志收集,存储,分析处理等工作,更详细的介绍可以参见apache网站。下面介绍下简单的安装配置方法
1,网上下载flume-ng安装包,分别部署在收集和接收日志文件的服务器上,服务器上需安装jdk 1.6以上, http://flume.apache.org/download.html ta
1、java版需要高于1.6,
$ yum install java 2、基础前提内容安装 安装详细步骤见:/display/CDHDOC/CDH3+Installation 2.1、$ wget /redhat/cdh/cdh3-repository-1.0-1.noarch.rpm 2.2、$ yum --nogpgcheck localinstall cdh3-repository-1.0-1.n
Flume 安装& &
wget /downloads/cloudera/flume/flume-distribution-0.9.4-bin.tar.gz export FLUME_HOME=/home/hadoop/hadoop/flumeexport PATH=.:$PATH::$FLUME_HOME/bin conf/flume?conf.xmlWatchdogNode CommonAgentCollectorMasterMast
flume的原理和使用&&&概述 flume是cloudera公司的一款高性能、高可能的分布式日志收集系统。flume的核心是把数据从数据源收集过来,再送到目的地。为了保证输送一定成功,在送到目的地之前,会先缓存数据,待数据真正到达目的地后,删除自己缓存的数据。flume传输的数据的基本单位是event,如果是文本文件,通常是一行记录,这也是事务的基本单位。flume运行的核心是agent。它是一个完整的数据收集工具,含有三个核心组件,分别是source、channel、si
Clinet是一个将原始log包装成events并且发送它们到一个或多个agent的实体。例如Flume log4j Appender可以使用Client SDK (org.apache.flume.api)定制特定的Client目的是从数据源系统中解耦Flume在flume的拓扑结构中不是必须的
一个Agent包含Sources, Channels, Sinks和其他组件,它利用这些组件将events从一个节点传输到另一个节点或最终目的。 agent是flume流的基础部分。flume为这些组件提供了配置、生命周期管理、监控支持。
Source负责接收events或通过特殊机制产生events,并将events批量的放到一个或多个Channels。有event驱动和轮询2种类型的Source 不同类型的Source:和众所周知的系统集成的Sources: Syslog, Netcat自动生成事件的Sources: Exec, SEQ用于Agent和Agent之间通信的IPC Sources: AvroSource必须至少和一个channel关联
学了几天storm的基础,发现如果有hadoop基础,再理解起概念来,容易的多。不过,涉及到一些独有的东西,如调度,如并发度,还是很麻烦。那么,从这一篇开始,力争清晰的梳理这些知识。一、基础概念1.1 Topology 原意拓扑。可以把他理解为是hadoop中的job,他是把一系列的任务项组装后的一个结果。1.2 Spout 是任务的一种,作用是读取数据,然后组装成一定的格式,发射出去。1.3 Bolt 是另一种任务,接收Spout或者上级Bolt发射的任务,进行处理。处理后,也有发射功能,当然如果已经处
ZK&&Web界面, node-zk-browserWeb展示每个path的属性和数据。需要安装Node.js&和&node-zookeeper &taokeeper为zookeeper做了什么?1.CPU/MEM/LOAD的监控//load2.&ZK日志目录所在磁盘剩余空间监控3.&单机连接数的峰值报警4.&单机&Watcher数的峰值报警5.&节点自检:是指对集群中每个IP所在ZK节点上的PATH:
学习编程并没有那么困难,你只是需要一位大神来手把手来带领,专业的Java老师在线讲课,帮助你有效快速的掌握Java,真心想要学习java的伙伴可以加我们,想学习编程不要看网上的这些视频教程,没有任何意义,你操作过程中会遇到大量的问题,学习编程可以加群【116,804,208】这里有很多人指导你一起学习,Java零基础到项目实战公开课,我们的课程偏向实战性,想要学习Java的伙伴欢迎到我们的课堂来一起学习。
类似的数据结构使用awk进行分析。第一种方式假定&ip地址在逗号分隔的第8段中,可以进行如下操作(当然这也的假定条件本身就是有问题的) awk: cat&access_log.*&|&awk&-F&','&'{print&$8}' | awk -F ':' '{if($1==&\&ip\&&){print $2}}' |sort |uniq -c |sort -rn
&这个问题网上给出的一般都是修改编码集,但对有些朋友来说,貌似不好使,我自己就是受害者。
原因也确实是编码集,但是很多人像我都搞错了顺序呢!下面是原因以及解决! & & & & & &我安装mysql的时候默认编码是utf8&,然后我启动hive&它给我自动create了若干张表,然后我才把表的编码集改成latin,但是其实里面表的编码集还是utf8&导致后面的问题我把mysql下的database
1Hbase日常运维&& 1.1&&&&&&&监控Hbase运行状况1.1.1操作系统1.1.1.1&IOa.群集网络IO,磁盘IO,HDFS IOIO越大说明文件读写操作越多。当IO突然增加时,有可能:1.compact队列较大,集群正在进行大量压缩操作。2.正在执行mapreduce作业可以通过CDH前台查看整个集群综合的数据或进入指定机器的前台查看单台机器的数据:b.Io wait磁盘IO对集群的影响
o Storm优势 ? 1. 简单的编程模型。类似于MapReduce降低了并行批处 理复杂性,Storm降低了进行实时处理的复杂性。 ? 2. 服务化,一个服务框架,支持热部署,即时上线或下线App. ? 3. 可以使用各种编程语言。你可以在Storm之上使用各种 编程语言。默认支持Clojure、Java、Ruby和Python。要 增加对其他语言的支持,只需实现一个简单的Storm通信 协议即可。 ? 4. 容错性。Storm会管理工作进程和节点的故障。 ? 5. 水平扩展。计算是在多个线程、进程和
转自 如何在Latex中高亮显示R语言代码,请看下面的示例:\documentclass{beamer}
\usepackage{ctex}
\usepackage{listings} %插入代码
\usepackage{xcolor} %代码高亮
\lstset{numbers=left, %设置行号位置
numberstyle=\tiny, %设置行号大小
keywordstyle=\color{blue}, %设置关键字颜色
促使我想写这个系列的文章,是因为看到总有人提到相同的问题,犯相同的错误,曾经我也是这么过来的,不忍心看到后面还有人经常这么曲折的过来。先了解下,在bash脚本中,有三种引号1. 单引号 '
2. 双引号 &
3. 反引号 `
两个单引号包围起来的字符串就是普通的字符串,它将保留原始的字面意思.
两个双引号包围起来的字符串,部分特殊字符将起到它们的作用.
这些特殊字符有: 美元符$, 反斜杠\, 反引号,
Storm单机+zookeeper集群安装
1、安装zookeeper集群
2、准备机器
10.10.3.44 flumemaster1
10.10.3.129 flumemaster2
10.10.3.132 flumecollector1
10.10.3.115 flumeNg1
3、配置hosts文件(4台服务器上面都需要配置)
vi /etc/hosts
需求:将mysql中的表b05_age的数据导入hive中
yum install sqoop(sqoop必须安装在有hive client的服务器上面,如果没有执行yum install hive)
复制mysql的驱动jar到/usr/lib/sqoop/lib下面
2、异常处理
正确命令:(将关系型数据的表结构复制到hive中)
sudo -u hive sqoop create-hive-table --connect jdbc:my
准备机器:
192.168.1.241
192.168.1.242
192.168.1.243
1、查看ip地址是否为静态ip,如果不是进行配置
vim /etc/sysconfig/network-scripts/ifcfg-eth0
DEVICE=eth0
TYPE=Ethernet
ONBOOT=yes
NM_CONTROLLED=yes
BOOTPROTO=none
IPADDR=192.168.1.241
1. 我用的Oracle 客户端最好的工具是PL/SQL Developer, 当然,如果用免费的Toad也不错,感觉现在用Toad的人还是挺多的。
2. Oracle SQL如果想提高速度有几个方式
1)创建索引,尽量建立唯一索引
2)当要创建的索引列的值取值比较小,建议创建Bitmap的索引而不是默认的Btree的。(比如性别,学历等)
3)在where条件后尽量采用数字类型的字段,比varchar的速度快
4)尽量不用用IN,Not In,union这样的条件查
1. 通过freemarker静态化
2. 通过jsp filter静态化
主要思路:请求servlet-&判断静态文件是否存在并且静态文件创建时间是否在阀值之内--&如果不是,则访问数据库生成静态文件-&否则直接跳转静态文件
然后通过urlReWrite直接将访问servlet的请求改为html,完成seo
最后通过SQUID缓存前台数据
一、从数据库中取相应数据并替换掉模板中的对应标签,下面是一个简单的示例
java.io.IOE
java.io.InputS
java.io.OutputS
java.io.OutputStreamW
java.io.PrintW
java.net.InetSocketA
java.util.L
java.util.M
服务器优化说明
WEB服务器优化
更换tomcat5为tomcat6
版本copy测试服务版本即可
加大tomcat内存
修改bin下的catalina.sh文件,增加青绿色部分
JAVA_OPTS='-Xms768m -Xmx1648m -XX:MaxPermSize=512m'
加大tomcat连接数
修改conf下的server.xml文件,修改青绿色部分参数值
s.username,
decode(l.type,'tm','table lock','tx','row lock',null) lock_level,
o.object_name,
o.object_type,
s.serial#,
s.terminal,
s.machine,
s.program,
from v$session s,v$lock l,dba_objects o
where l.sid = s.s
一直以来都想接触Storm实时计算这块的东西,最近在群里看到上海一哥们罗宝写的Flume+Kafka+Storm的实时日志流系统的搭建文档,自己也跟着整了一遍,之前罗宝的文章中有一些要注意点没提到的,以后一些写错的点,在这边我会做修正;内容应该说绝大部分引用罗宝的文章的,这里要谢谢罗宝兄弟,还有写这篇文章@晨色星空J2EE也给了我很大帮助,这里也谢谢@晨色星空J2EE之前在弄这个的时候,跟群里的一些人讨论过,有的人说,直接用storm不就可以做实时处理了,用不着那么麻烦;其实不然,做软件开发的都知道模块化
1. Storm介绍2. Storm环境配置3. Storm程序流程4. Storm总结及问题1. Storm介绍 1.1&实时流计算背景 随着互联网的更进一步发展,信息浏览、搜索、关系交互传递型,以及电子商务、互联网旅游生活产品等将生活中的流通环节在线化。对于实时性的要求进一步提升,而信息的交互和沟通正在从点对点往信息链甚至信息网的方向发展,这样必然带来数据在各个维度的交叉关联,数据爆炸已不可避免。因此流式处理和NoSQL产品应运而生,分别解决实时框架和数据大 规模存储计算的问题。 流式处理可Flume+Kafka收集Docker容器内分布式日志应用实践 - 为程序员服务
为程序员服务
Flume+Kafka收集Docker容器内分布式日志应用实践
1 背景和问题
随着云计算、PaaS平台的普及,虚拟化、容器化等技术的应用,例如Docker等技术,越来越多的服务会部署在云端。通常,我们需要需要获取日志,来进行监控、分析、预测、统计等工作,但是云端的服务不是物理的固定资源,日志获取的难度增加了,以往可以SSH登陆的或者FTP获取的,现在可不那么容易获得,但这又是工程师迫切需要的,最典型的场景便是:上线过程中,一切都在GUI化的PaaS平台点点鼠标完成,但是我们需要结合tail -F、grep等命令来观察日志,判断是否上线成功。当然这是一种情况,完善的PaaS平台会为我们完成这个工作,但是还有非常多的ad-hoc的需求,PaaS平台无法满足我们,我们需要日志。本文就给出了在分布式环境下,容器化的服务中的分散日志,如何集中收集的一种方法。
2 设计约束和需求描述
做任何设计之前,都需要明确应用场景、功能需求和非功能需求。
2.1 应用场景
分布式环境下可承载百台服务器产生的日志,单条数据日志小于1k,最大不超过50k,日志总大小每天小于500G。
2.2 功能需求
1)集中收集所有服务日志。
2)可区分来源,按服务、模块和天粒度切分。
2.3 非功能需求
1)不侵入服务进程,收集日志功能需独立部署,占用系统资源可控。
2)实时性,低延迟,从产生日志到集中存储延迟小于4s。
3)持久化,保留最近N天。
4)尽量递送日志即可,不要求不丢不重,但比例应该不超过一个阈值(例如万分之一)。
4)可以容忍不严格有序。
5)收集服务属于线下离线功能,可用性要求不高,全年满足3个9即可。
3 实现架构
一种方案实现的架构如下图所示:
3.1 Producer层分析
PaaS平台内的服务假设部署在Docker容器内,那么为了满足非功能需求#1,独立另外一个进程负责收集日志,因此不侵入服务框架和进程。采用
来进行日志的收集,这个开源的组件非常强大,可以看做一种监控、生产增量,并且可以发布、消费的模型,Source就是源,是增量源,Channel是缓冲通道,这里使用内存队列缓冲区,Sink就是槽,是个消费的地方。容器内的Source就是执行tail -F这个命令的去利用linux的标准输出读取增量日志,Sink是一个Kafka的实现,用于推送消息到分布式消息中间件。
3.2 Broker层分析
PaaS平台内的多个容器,会存在多个Flume NG的客户端去推送消息到Kafka消息中间件。Kafka是一个吞吐量、性能非常高的消息中间件,采用单个分区按照顺序的写入的方式工作,并且支持按照offset偏移量随机读取的特性,因此非常适合做topic发布订阅模型的实现。这里图中有多个Kafka,是因为支持集群特性,容器内的Flume NG客户端可以连接若干个Kafka的broker发布日志,也可以理解为连接若干个topic下的分区,这样可以实现高吞吐,一来可以在Flume NG内部做打包批量发送来减轻QPS压力,二来可以分散到多个分区写入,同时Kafka还会指定replica备份个数,保证写入某个master后还需要写入N个备份,这里设置为2,没有采用常用的分布式系统的3,是因为尽量保证高并发特性,满足非功能需求中的#4。
3.3 Consumer层分析
消费Kafka增量的也是一个Flume NG,可以看出它的强大之处,在于可以接入任意的数据源,都是可插拔的实现,通过少量配置即可。这里使用Kafka Source订阅topic,收集过来的日志同样先入内存缓冲区,之后使用一个File Sink写入文件,为了满足功能需求#2,可区分来源,按服务、模块和天粒度切分,我自己实现了一个Sink,叫做RollingByTypeAndDayFileSink,源代码放到了
上,可以从这个
jar,直接放到flume的lib目录即可。
4 实践方法
4.1 容器内配置
Dockerfile
Dockerfile是容器内程序的运行脚本,里面会含有不少docker自带的命令,下面是要典型的Dockerfile,BASE_IMAGE是一个包含了运行程序以及flume bin的镜像,比较重要的就是ENTRYPOINT,主要利用supervisord来保证容器内进程的高可用。
FROM ${BASE_IMAGE}
MAINTAINER ${MAINTAINER}
ENV REFRESH_AT ${REFRESH_AT}
RUN mkdir -p /opt/${MODULE_NAME}
ADD ${PACKAGE_NAME} /opt/${MODULE_NAME}/
COPY service.supervisord.conf /etc/supervisord.conf.d/service.supervisord.conf
COPY supervisor-msoa-wrapper.sh /opt/${MODULE_NAME}/supervisor-msoa-wrapper.sh
RUN chmod +x /opt/${MODULE_NAME}/supervisor-msoa-wrapper.sh
RUN chmod +x /opt/${MODULE_NAME}/*.sh
ENTRYPOINT ["/usr/bin/supervisord", "-c", "/etc/supervisord.conf"]
下面是supervisord的配置文件,执行supervisor-msoa-wrapper.sh脚本。
[program:${MODULE_NAME}]
command=/opt/${MODULE_NAME}/supervisor-msoa-wrapper.sh
下面是supervisor-msoa-wrapper.sh,这个脚本内的start.sh或者stop.sh就是应用程序的启动和停止脚本,这里的背景是我们的启停的脚本都是在后台运行的,因此不会阻塞当前进程,因此直接退出了,Docker就会认为程序结束,因此应用生命周期也结束,这里使用wait命令来进行一个阻塞,这样就可以保证即使后台运行的进程,我们可以看似是前台跑的。
这里加入了flume的运行命令,–conf后面的参数标示会去这个文件夹下面寻找flume-env.sh,里面可以定义JAVA_HOME和JAVA_OPTS。–conf-file指定flume实际的source、channel、sink等的配置。
#! /bin/bash
function shutdown()
echo "Shutting down Service"
unset SERVICE_PID # Necessary in some cases
cd /opt/${MODULE_NAME}
source stop.sh
## 停止进程
cd /opt/${MODULE_NAME}
echo "Stopping Service"
source stop.sh
## 启动进程
echo "Starting Service"
source start.sh
export SERVICE_PID=$!
## 启动Flume NG agent,等待4s日志由start.sh生成
nohup /opt/apache-flume-1.6.0-bin/bin/flume-ng agent --conf /opt/apache-flume-1.6.0-bin/conf --conf-file /opt/apache-flume-1.6.0-bin/conf/logback-to-kafka.conf --name a1 -Dflume.root.logger=INFO,console &
# Allow any signal which would kill a process to stop Service
trap shutdown HUP INT QUIT ABRT KILL ALRM TERM TSTP
echo "Waiting for $SERVICE_PID"
wait $SERVICE_PID
source本应该采用
,执行tailf -F日志文件即可。但是这里使用了一个自行开发的
,源代码可以在
上找到。之所以采用自定义的,是因为需要将一些固定的信息传递下去,例如服务/模块的名称以及分布式服务所在容器的hostname,便于收集方根据这个标记来区分日志。如果这里你发现为什么不用flume的拦截器interceptor来做这个工作,加入header中一些KV不就OK了吗?这是个小坑,我后续会解释一下。
例如原来日志的一行为:
12:59:31,080 [main]
fountain.runner.CustomConsumerFactoryPostProcessor
(CustomConsumerFactoryPostProcessor.java:91)
-Start to init IoC container by loading XML bean definitions from classpath:fountain-consumer-stdout.xml
按照如下配置,那么实际传递给Channel的日志为:
service1##$$##m1-ocean-1004.cp
12:59:31,080 [main]
fountain.runner.CustomConsumerFactoryPostProcessor
(CustomConsumerFactoryPostProcessor.java:91)
-Start to init IoC container by loading XML bean definitions from classpath:fountain-consumer-stdout.xml
channel使用内存缓冲队列,大小标识可容乃的日志条数(event size),事务可以控制一次性从source以及一次性给sink的批量日志条数,实际内部有个timeout超时,可通过keepAlive参数设置,超时后仍然会推送过去,默认为3s。
,配置broker的list列表以及topic的名称,需要ACK与否,以及一次性批量发送的日志大小,默认5条一个包,如果并发很大可以把这个值扩大,加大吞吐。
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = com.baidu.unbiz.flume.sink.StaticLinePrefixExecSource
a1.mand = tail -F /opt/MODULE_NAME/log/logback.log
a1.sources.r1.channels = c1
a1.sources.r1.prefix=service1
a1.sources.r1.separator=##$$##
a1.sources.r1.suffix=m1-ocean-1004.cp
# Describe the sink
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = keplerlog
a1.sinks.k1.brokerList = gzns-cm-n01.gzns:9092,gzns-cm-n02.gzn
a1.sinks.k1.requiredAcks = 0
a1.sinks.k1.batchSize = 5
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
4.2 Broker配置
,这里新建一个名称叫做keplerlog的topic,备份数量为2,分区为4。
& bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 2 --partitions 4 --topic keplerlog
制造一些增量信息,例如如下脚本,在终端内可以随便输入一些字符串:
& bin/kafka-console-producer.sh --broker-list localhost:9092 --topic keplerlog
打开另外一个终端,订阅topic,确认可以看到producer的输入的字符串即可,即表示联通了。
& bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic keplerlog --from-beginning
4.3 集中接收日志配置
首先source采用flume官方提供的
,配置好zookeeper的地址,会去找可用的broker list进行日志的订阅接收。channel采用内存缓存队列。sink由于我们的需求是按照服务名称和日期切分日志,而官方提供的默认
,只能按照时间戳,和时间interval来切分。
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
a1.sources.r1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.r1.zookeeperConnect = localhost:2181
a1.sources.r1.topic = keplerlog
a1.sources.r1.batchSize = 5
a1.sources.r1.groupId = flume-collector
a1.sources.r1.kafka.consumer.timeout.ms = 800
# Describe the sink
a1.sinks.k1.type = com.baidu.unbiz.flume.sink.RollingByTypeAndDayFileSink
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /home/work/data/kepler-log
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000000
a1.channels.c1.transactionCapacity = 100
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
定制版RollingByTypeAndDayFileSink
。RollingByTypeAndDayFileSink使用有两个条件:
1)Event header中必须有timestamp,否则会忽略事件,并且会抛出{@link InputNotSpecifiedException}
2)Event body如果是按照##$$##分隔的,那么把分隔之前的字符串当做模块名称(module name)来处理;如果没有则默认为default文件名。
输出到本地文件,首先要设置一个跟目录,通过sink.directory设置。其次根据条件#2中提取出来的module name作为文件名称前缀,timestamp日志作为文件名称后缀,例如文件名为portal.或者default.。
规整完的一个文件目录形式如下,可以看出汇集了众多服务的日志,并且按照服务名称、时间进行了区分:
~/data/kepler-log$ ls
authorization.
不得不提的两个坑
回到前两节提到的自定义了一个StaticLinePrefixExecSource来进行添加一些前缀的工作。由于要区分来源的服务/模块名称,并且按照时间来切分,根据官方flume文档,完全可以采用如下的Source拦截器配置。例如i1表示时间戳,i2表示默认的静态变量KV,key=module,value=portal。
a1.sources.r1.interceptors = i2 i1
a1.sources.r1.interceptors.i1.type = timestamp
a1.sources.r1.interceptors.i2.type = static
a1.sources.r1.interceptors.i2.key = module
a1.sources.r1.interceptors.i2.value = portal
但是flume官方默认的
while (eventList.size() & batchUpperLimit &&
System.currentTimeMillis() & batchEndTime) {
iterStatus = hasNext();
if (iterStatus) {
// get next message
MessageAndMetadata&byte[], byte[]& messageAndMetadata = it.next();
kafkaMessage = messageAndMetadata.message();
kafkaKey = messageAndMetadata.key();
// Add headers to event (topic, timestamp, and key)
headers = new HashMap&String, String&();
headers.put(KafkaSourceConstants.TIMESTAMP,
String.valueOf(System.currentTimeMillis()));
headers.put(KafkaSourceConstants.TOPIC, topic);
if (kafkaKey != null) {
headers.put(KafkaSourceConstants.KEY, new String(kafkaKey));
if (log.isDebugEnabled()) {
log.debug("Message: {}", new String(kafkaMessage));
event = EventBuilder.withBody(kafkaMessage, headers);
eventList.add(event);
可以看出自己重写了Event header中的KV,丢弃了发送过来的header,因为这个坑的存在因此,tailf -F在event body中在前面指定模块/服务名称,然后RollingByTypeAndDayFileSink会按照分隔符切分。否则下游无法能达到KV。
exec source需要执行tail -F命令来通过标准输出和标准错误一行一行的读取,但是如果把tail -F封装在一个脚本中,脚本中再执行一些管道命令,例如tail -F logback.log | awk ‘{print "portal##$$##"$0}’,那么exec source总是会把最近的输出丢弃掉,导致追加到文件末尾的日志有一些无法总是“姗姗来迟”,除非有新的日志追加,他们才会被“挤”出来。这个问题比较诡异。暂时没有细致研究。以示后人不要采坑。
从这个分布式服务分散日志的集中收集方法,可以看出利用一些开源组件,可以非常方便的解决我们日常工作中所发现的问题,而这个发现问题和解决问题的能力才是工程师的基本素质要求。对于其不满足需求的,需要具备有钻研精神,知其然还要知其所以然的去做一些ad-hoc工作,才可以更加好的leverage这些组件。
另外,日志的收集只是起点,利用宝贵的数据,后面的使用场景和想象空间都会非常大,例如
1)利用Spark streaming在一个时间窗口内计算日志,做流量控制和访问限制。
2)使用awk脚本、scala语言的高级函数做单机的访问统计分析,或者Hadoop、Spark做大数据的统计分析。
3)除了端口存活和语义监控,利用实时计算处理日志,做ERROR、异常等信息的过滤,实现服务真正的健康保障和预警监控。
4)收集的日志可以通过logstash导入Elastic Search,使用ELK方式做日志查询使用。
转载时请注明转自
推荐阅读:
neo:面对恐惧男人的战栗往往不是来自胆怯,而是极度的兴奋
您可以通过下面的社交媒体联系/了解作者的更多信息:
相关聚客文章

我要回帖

更多关于 flume的使用 的文章

 

随机推荐