sparkspark streaming hdfs怎么实时读关系型数据库数据

1057人阅读
saprk streaming(1)
spark streaming是一个分布式高可靠的准实时处理系统,其数据源可以flume、Hdfs、kafka等,其结果可以保存到关系型数据库,HDFS上。保存到HDFS上相对简单,一句话就可以搞定,但是要保存到关系数据库中,相对比较麻烦,既要链接数据库,又要知道数据字段。
我们首先写个wordcount程序测试一下,通过网络发数据到spark streaming
发数据程序如下
import java.io.{PrintWriter}
import java.net.ServerSocket
import scala.io.Source
object SaleSimulation {
def index(length: Int) = {
import java.util.Random
val rdm = new Random
rdm.nextInt(length)
def main(args: Array[String]) {
if (args.length != 3) {
System.err.println(&Usage: &filename& &port& &millisecond&&)
System.exit(1)
val filename = args(0)
val lines = Source.fromFile(filename).getLines.toList
val filerow = lines.length
val listener = new ServerSocket(args(1).toInt)
while (true) {
val socket = listener.accept()
new Thread() {
override def run = {
println(&Got client connected from: & + socket.getInetAddress)
val out = new PrintWriter(socket.getOutputStream(), true)
while (true) {
Thread.sleep(args(2).toLong)
val content = lines(index(filerow))
println(content)
out.write(content + '\n')
out.flush()
socket.close()
}打成jar包后运行
java -cp spark_streaming_test.jar com.pinganfu.ss.SaleSimulation /spark/people.txt
spark streaming程序如下:
import java.sql.{PreparedStatement, Connection, DriverManager}
import java.util.concurrent.atomic.AtomicInteger
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
//No need to call Class.forName(&com.mysql.jdbc.Driver&) to register Driver?
object SparkStreamingForPartition {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName(&NetCatWordCount&)
conf.setMaster(&local[3]&)
val ssc = new StreamingContext(conf, Seconds(5))
val dstream = ssc.socketTextStream(&hadoopMaster&, 9999).flatMap(_.split(& &)).map(x =& (x, 1)).reduceByKey(_ + _)
dstream.foreachRDD(rdd =& {
//embedded function
def func(records: Iterator[(String,Int)]) {
//Connect the mysql
var conn: Connection = null
var stmt: PreparedStatement = null
val url = &jdbc:mysql://hadoopMaster:3306/streaming&;
val user = &root&;
val password = &hadoop&
conn = DriverManager.getConnection(url, user, password)
records.foreach(word =& {
val sql = &insert into wordcounts values (?,?)&;
stmt = conn.prepareStatement(sql);
stmt.setString(1, word._1)
stmt.setInt(2, word._2)
stmt.executeUpdate();
case e: Exception =& e.printStackTrace()
} finally {
if (stmt != null) {
stmt.close()
if (conn != null) {
conn.close()
val repartitionedRDD = rdd.repartition(3)
repartitionedRDD.foreachPartition(func)
ssc.start()
ssc.awaitTermination()
1. DStream.foreachRDD是一个Output Operation,DStream.foreachRDD是数据落地很常用的方法
2. 获取MySQL Connection的操作应该放在foreachRDD的参数(是一个RDD[T]=&Unit的函数类型),这样,当
foreachRDD方法在每个Worker上执行时,连接是在Worker上创建。如果Connection的获取放到dstream.foreachRDD之
前,那么Connection的获取动作将发生在Driver端,然后通过序列化的方式发送到各个Worker(Connection的序列化通常是无法正确序列化的)
3. Connection的获取在foreachRDD的参数中获取,同时还要在遍历RDD之前获取(调用RDD的foreach方法前获取),如果遍历中获取,那么RDD中的每个record都要打开关闭连接,这对于数据库连接资源将是极大的考验
4. 业务逻辑处理定义在func中,它是在foreachRDD的方法参数体中定义的,如果把func的定义放到外面,即Driver中,貌似也是可以的,Spark会对计算方法通过Broadcast进行广播到各个计算节点。
参考知识库
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:6963次
排名:千里之外
原创:21篇14:53 提问
spark streaming实时分析去重问题
spark streaming实时分析处理时,处理的数据可能会出现重复,需要根据唯一的key进行处理,谁知道怎么处理
按赞数排序
----------------------同志你好,我是CSDN问答机器人小N,奉组织之命为你提供参考答案,编程尚未成功,同志仍需努力!4840人阅读
sparkstreaming
最近在用sparkstreaming的技术来实现公司实时号码热度排序,学习了一下sparkstreaming的相关技术,今天主要要讲一个简单sparkstreaming实时数据流技术的一个示例,帮助大家更好的理解和学习sparkstreaming编程原理。
在开始实例之前我们简单的了解一下sparkstreaming的原理:具体参见:/blog/2248368
Spark Streaming&是Spark核心API的一个扩展,可以实现高吞吐量的、具备容错机制的实时流数据的处理。支持从多种数据源获取数据,包括Kafk、Flume、Twitter、ZeroMQ、Kinesis&以及TCP sockets,从数据源获取数据之后,可以使用诸如map、reduce、join和window等高级函数进行复杂算法的处理。最后还可以将处理结果存储到文件系统,数据库和现场仪表盘。在“One Stack rule them
all”的基础上,还可以使用Spark的其他子框架,如集群学习、图计算等,对流数据进行处理。
Spark Streaming处理的数据流图:
Spark的各个子框架,都是基于核心Spark的,Spark Streaming在内部的处理机制是,接收实时流的数据,并根据一定的时间间隔拆分成一批批的数据,然后通过Spark Engine处理这些批数据,最终得到处理后的一批批结果数据。
对应的批数据,在Spark内核对应一个RDD实例,因此,对应流数据的DStream可以看成是一组RDDs,即RDD的一个序列。通俗点理解的话,在流数据分成一批一批后,通过一个先进先出的队列,然后&Spark Engine从该队列中依次取出一个个批数据,把批数据封装成一个RDD,然后进行处理,这是一个典型的生产者消费者模型,对应的就有生产者消费者模型的问题,即如何协调生产速率和消费速率。
Spark Streaming实时计算框架
&&&&Spark是一个类似于MapReduce的分布式计算框架,其核心是弹性分布式数据集,提供了比MapReduce更丰富的模型,可以在快速在内存中对数据集进行多次迭代,以支持复杂的数据挖掘算法和图形计算算法。Spark Streaming是一种构建在Spark上的实时计算框架,它扩展了Spark处理大规模流式数据的能力。
Spark Streaming的优势在于:
能运行在100+的结点上,并达到秒级延迟。使用基于内存的Spark作为执行引擎,具有高效和容错的特性。能集成Spark的批处理和交互查询。为实现复杂的算法提供和批处理类似的简单接口。
基于云梯Spark on Yarn的Spark Streaming总体架构如图1所示。其中Spark on Yarn的启动流程我的另外一篇文章(《程序员》2013年11月期刊《深入剖析阿里巴巴云梯Yarn集群》)有详细描述,这里不再赘述。Spark on Yarn启动后,由Spark AppMaster把Receiver作为一个Task提交给某一个Spark Executor;Receive启动后输入数据,生成数据块,然后通知Spark AppMaster;Spark
AppMaster会根据数据块生成相应的Job,并把Job的Task提交给空闲Spark Executor 执行。图中蓝色的粗箭头显示被处理的数据流,输入数据流可以是磁盘、网络和HDFS等,输出可以是HDFS,数据库等。
图1 云梯Spark Streaming总体架构
Spark Streaming的基本原理是将输入数据流以时间片(秒级)为单位进行拆分,然后以类似批处理的方式处理每个时间片数据,其基本原理如图2所示。
图2&Spark Streaming基本原理图
首先,Spark Streaming把实时输入数据流以时间片Δt (如1秒)为单位切分成块。Spark Streaming会把每块数据作为一个RDD,并使用RDD操作处理每一小块数据。每个块都会生成一个Spark Job处理,最终结果也返回多块。
下面介绍Spark Streaming内部实现原理。
使用Spark Streaming编写的程序与编写Spark程序非常相似,在Spark程序中,主要通过操作RDD(Resilient Distributed Datasets弹性分布式数据集)提供的接口,如map、reduce、filter等,实现数据的批处理。而在Spark Streaming中,则通过操作DStream(表示数据流的RDD序列)提供的接口,这些接口和RDD提供的接口类似。图3和图4展示了由Spark Streaming程序到Spark
jobs的转换图。
图3&Spark Streaming程序转换为DStream Graph
图4&DStream Graph转换为Spark jobs
在图3中,Spark Streaming把程序中对DStream的操作转换为DStream Graph,图4中,对于每个时间片,DStream Graph都会产生一个RDD Graph;针对每个输出操作(如print、foreach等),Spark Streaming都会创建一个Spark action;对于每个Spark action,Spark Streaming都会产生一个相应的Spark job,并交给JobManager。JobManager中维护着一个Jobs队列,
Spark job存储在这个队列中,JobManager把Spark job提交给Spark Scheduler,Spark Scheduler负责调度Task到相应的Spark Executor上执行。
Spark Streaming的另一大优势在于其容错性,RDD会记住创建自己的操作,每一批输入数据都会在内存中备份,如果由于某个结点故障导致该结点上的数据丢失,这时可以通过备份的数据在其它结点上重算得到最终的结果。
正如Spark Streaming最初的目标一样,它通过丰富的API和基于内存的高速计算引擎让用户可以结合流式处理,批处理和交互查询等应用。因此Spark Streaming适合一些需要历史数据和实时数据结合分析的应用场合。当然,对于实时性要求不是特别高的应用也能完全胜任。另外通过RDD的数据重用机制可以得到更高效的容错处理。
了解了sparkstreaming的工作原理后,我们来开始我们的实时处理实例编程吧
首先我们要做一个日志生产器,方便本地模拟线上环境:
直接上代码吧(原理是根据一个原始日志log,然后随机的从中挑选行添加到新生产的日志中,并且生产的数据量呈不断的增长态势)
import java.io._
import java.text.SimpleDateFormat
import org.apache.spark.{SparkConf, SparkContext}
import java.util.Date
import java.io.{PrintWriter}
import scala.io.Source
import scala.util.matching.Regex
object FileGenerater {
def main(args: Array[String]) {
while (i&100 )
val filename = args(0)
val lines = Source.fromFile(filename).getLines.toList
val filerow = lines.length
val writer = new PrintWriter(new File(&/Users/mac/Documents/workspace/output/sparkstreamingtest&+i+&.txt& ))
while(j&i)
writer.write(lines(index(filerow)))
println(lines(index(filerow)))
writer.close()
Thread sleep 5000
log(getNowTime(),&/Users/mac/Documents/workspace/output/sparkstreamingtest&+i+&.txt generated&)
def log(date: String, message: String)
println(date + &----& + message)
* 从每行日志解析出imei和logid
def index(length: Int) = {
import java.util.Random
val rdm = new Random
rdm.nextInt(length)
def getNowTime():String={
val now:Date = new Date()
val datetimeFormat:SimpleDateFormat = new SimpleDateFormat(&yyyy-MM-dd hh:mm:ss&)
val ntime = datetimeFormat.format( now )
* 根据时间字符串获取时间,单位(秒)
def getTimeByString(timeString: String): Long = {
val sf: SimpleDateFormat = new SimpleDateFormat(&yyyyMMddHHmmss&)
sf.parse(timeString).getTime / 1000
下面给出我们程序的configuration:
zhangfusheng.txt内容如下:
安徽 宿州市 汽车宿州分公司 王红岩 38
浙江 嘉兴市 汽车海宁分公司 金韩伟 03
安徽 滁州市 汽车滁州分公司 严敏 03
湖北 武汉市 汽车湖北汽车服务分公司 张晴 70
安徽 淮北市 汽车淮北分公司 李亚 84
安徽 滁州市 汽车滁州分公司 王旭东 174
安徽 淮南市 汽车淮南分公司 尹芳 085
湖北 省直辖行政单位 汽车仙桃分公司 汤黎 38
湖北 null 汽车潜江分公司 朱疆振 9
安徽 宣城 汽车宣城分公司 李倩 1
江苏 徐州 丰县分公司 李萍
9340 归属地
安徽 滁州市 汽车滁州分公司 阚家萍 0
广东 中山 汽车服务中心 农小萍 5 归属地
湖北 孝感 汽车孝感分公司 黄燕平 95 归属地
安徽 芜湖 null 邹恒清
8537 归属地
江西 null 汽车江西分公司产品事业部(汽车服务分公司、互联网安全管理中心) 张凯
安徽 淮南市 汽车淮南分公司 李磊 9
湖北 省直辖行政单位 汽车仙桃分公司 朱艳 25
浙江 温州 汽车温州分公司(本部) 吴玉春 29 归属地
安徽 淮北市 汽车淮北分公司 魏薇 3
湖北 省直辖行政单位 汽车仙桃分公司 王雪纯 5
湖北 宜昌市 汽车宜昌分公司 刘丽娟 69
湖北 武汉市 汽车湖北汽车服务分公司 陶劲松 09
安徽 淮北 汽车合肥分公司 刘洁 08 归属地
湖北 null 宜昌电信公司 鲜艳 0
安徽 淮北市 汽车淮北分公司 钱玉 37
湖北 武汉市 汽车湖北汽车服务分公司 谢真华 57
安徽 null 马鞍山公司 张颖 10
安徽 芜湖市 汽车芜湖分公司 许丽丽 94
安徽 合肥市 汽车合肥分公司 杨华丽 6
安徽 铜陵市 汽车铜陵分公司 黄琳 65
安徽 马鞍山 汽车马鞍山分公司 林花 7
贵州 null 汽车贵州分公司10000号运营中心 陈宣宏 21
安徽 合肥市 汽车合肥分公司 黄乐 71
安徽 淮南市 汽车淮南分公司 赵乃艳 63
湖北 武汉市 汽车湖北汽车服务分公司 蔡蕾 18
湖北 null 汽车潜江分公司 陈晓辉 6
安徽 马鞍山市 汽车马鞍山分公司 陈凤 6
安徽 合肥市 汽车合肥分公司 李大燕 6
我先来观察一下运行结果:
最后我们就开始coding sparkstreaming的部分代码:(主要要添加scala-sdk-2.10.6和spark-assembly-1.6.2-hadoop2.6.0等jar包)/**
* Created by mac on 16/8/12.
import org.apache.spark.SparkConf
import org.apache.spark.streaming._;
object SparkStreaming {
def main(args: Array[String]) {
//开本地线程两个处理,local[4]:意思本地起4个进程运行,setAppName(&SparkStreaming&):设置运行处理类
val conf = new SparkConf().setMaster(&local[4]&).setAppName(&SparkStreaming&)
//每隔5秒计算一批数据local[4]:意思本地起4个进程运行,setAppName(&SparkStreaming&):设置运行处理类
val ssc = new StreamingContext(conf, Seconds(5))
// 指定监控的目录
val lines = ssc.textFileStream(&/Users/mac/Documents/workspace/output&)
//按\t 切分输入数据
val words = lines.flatMap(_.split(&\t&))
//计算wordcount
val pairs = words.map(word =& (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
//排序结果集打印,先转成rdd,然后排序true升序,false降序,可以指定key和value排序_._1是key,_._2是value
val sortResult = wordCounts.transform(rdd =& rdd.sortBy(_._2, false))
sortResult.print()
ssc.start() // 开启计算
ssc.awaitTermination() // 阻塞等待计算
从结果可以看出,sparkstreaming每次会将设置的时间分片以内发生的增量日志进行一次批量处理,最终输出这个增量处理的结果。
参考知识库
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:63266次
排名:千里之外
原创:22篇
评论:10条
(2)(1)(4)(17)&&&&频道:| |
> 基于Spark的数据库增量准实时同步
基于Spark的数据库增量准实时同步
为了实现将传统关系型数据库中的增量数据快速导入同构或者异构目的库,在使用已有的增量提取方法的基础上,提
时间: 03:58 来源:网络整理 作者:Ioter 点击:次
  王浩,葛昂,赵晴  (华北计算机系统工程研究所,北京 100083)摘要:为了实现将传统关系型数据库中的增量数据快速导入同构或者异构目的库,在使用已有的增量提取方法的基础上,提出了通过增加并行度和流式计算的方法加快同步速度。此方法不仅支持插入、更新和删除的增量数据同步,而且可以抽取出数据库表结构信息动态支持表结构变更。与传统单点抽取方式相比,大大提高了目的库数据的新鲜度。  关键词:增量同步; S 流式计算0引言  随着技术的发展,越来越多的企业开始构建大数据平台进行数据处理。然而如何将保存在关系型数据库中的数据快速同步到大数据平台组件(例如HBase、HDFS)中,正成为很多企业面临的问题。Sqoop是常用的数据同步工具,其实质是MapReduce任务,延时较高,而且需要通过定时任务来达到流程效果。本文在触发器记录数据变化的基础上,提出了一种使用Spark Streaming将增量数据抽取出来,然后根据需要写入到不同的目的库的方法。由于只提取增量数据,所以较Sqoop减少了数据量。另外由于是流式处理方式,降低了延时。1增量提取  1.1增量提取的概念  增量提取是针对上一次提取而言,将上一次提取时间点到现在数据库中插入、更新、删除的数据提取出来[1]。  1.2常用的增量提取方法  1.2.1基于业务系统日志  在业务中将数据库DML(Data Manipulation Language)语句输出以日志的方式存储,然后通过解析日志将DML语句在目的库中重放以达到目的。此方法需要侵入业务系统,对于已经成型的业务系统不适用。  1.2.2基于数据库日志  解析数据库日志也能达到增量提取的目的,但是各大数据库厂商不对外开放数据库系统的日志格式,这就使得解析日志变成了问题。而且各数据库的日志格式还不尽相同,难以达到通用性。  1.2.3基于触发器  基于触发器的方式,目前被广泛运用于数据库增量提取。它通过在源表上建立插入、更新、删除触发器来记录对数据的操作。每当有数据变化时,就会触发相应的触发器,然后运行触发器定义的逻辑,将变化记录到增量表。  1.3基于触发器方法的具体实现  由于触发器方法具有实现逻辑简单,对业务无入侵,数据库通用等优点,所以本文采用了基于触发器方式的增量提取方法。具体实现方法如下:  (1)创建名为dml_log的数据库表,字段为id、table_name、record_id、execute_date、dml_type。其中id为自增id,table_name存储要同步的源表表名称,record_id是源表中发生变化的记录的唯一标识,execute_date为触发器执行时的时间戳,dml_type为I、U、D分别代表insert、update、delete操作。  (2)在源表上创建插入、更新、删除类型的触发器。创建语句在此省略。2构建Spark Streaming程序  2.1Spark Streaming  Spark是目前大数据处理领域比较常用的计算框架。它将中间计算结果维护在内存中,这样不仅可以做到中间结果的重用,而且减少了磁盘IO,大大加快了计算速度。Spark Streaming是构建于Spark core之上的流式处理模块。其原理是将流式数据切分成一个个小的片段,以mini batch的形式来处理这一小部分数据,从而模拟流式计算达到准实时的效果。  2.2JdbcRDD  弹性分布式数据集(Resilient Distributed Datasets,RDD),它是Spark数据抽象的基石。RDD是一个只读的分区记录集合,分区分散在各个计算节点[2]。RDD提供了transformation和action两类操作,其中transformation是lazy级别的,主要对数据处理流程进行标记,而不立即进行运算。action操作会触发作业的提交,然后进行回溯导致transformation操作进行运算。  JdbcRDD扩展自RDD,是RDD的子类。内部通过JDBC(Java Data Base Connectivity)操作以数据库为源头构建RDD。其构造函数签名为:  class JdbcRDD[T: ClassTag](  sc: SparkContext,  getConnection:()=& Connection,  sql: String,  lowerBound: Long,  upperBound: Long,  numPartitions: Int,  mapRow:(ResultSet) =& T =  JdbcRDD.resultSetToObjectArray _)  extends RDD[T](sc, Nil) with Logging {…}  2.3具体实现  Spark官方提供用于构建Spark Streaming的数据源没有对数据库进行支持,所以本文自己实现对数据库的支持。编写继承自InputDStream类的DirectJdbcInputDStream类,其签名为:  class DirectJdbcInputDStream[T: ClassTag](  @transient ssc_ : StreamingContext,  param: JdbcParam) extends  InputDStream[Row] (ssc_) with Logging {…}  对start()、compute()和stop()方法进行重写。  (1)在start函数中注册JDBC驱动,用于JDBC获取初始化信息(构造JdbcRDD时的参数);
(责任编辑:ioter)
本文链接:
http://www./IC/tech/47.html
声明:物联网在线转载作品均尽可能注明出处,该作品所有人的一切权利均不因本站转载而转移。作者如不同意转载,即请予以删除或改正。转载的作品可能在标题或内容上或许有所改动。
TopeWay Business Media
Copyright (C) 2011 . 本网站所有内容均受版权保护。
未经版权所有人明确的书面许可,不得以任何方式或媒体翻印或转载本网站的部分或全部内容。

我要回帖

更多关于 spark streaming原理 的文章

 

随机推荐