streamsets怎么将mysql的驱动jar包jar放上去

将指定kafka的一个topic的数据实时入库到elasticsearch中。
kafka:0.10.2
elasticsearch:5.6.1
Origins:Kafka Consumer
Destiations:ElasticSearch
3.2、注意事项
1、若只读取一个topic的数据,origins使用Kafka Consumer;若要同时读取多个topic,则使用Kafka Multitopic。
2、kafka若要实现读取历史数据,要添加auto.offset.reset=earliest
3.3.1、Origins配置
(1)选择源
Origins选择Kafka Consumer
Data Format
3.3.2、Destiations配置
(1)选择目的
Destiations选择ElasticSearch
ElasticSearch
3.3.3、Pipeline
Error Records
一 、环境搭建
1.环境准备
maven 3.23+
下载地址 http://maven.apache.org/download.cgi
1、支持多种安装方式1.1、核心安装包(Core Tarball)该安装包包含核心的SDC软件,使该软件具有最小的软件连接器集合,当然你可以手动下载额外的节点(Stage)①通过Streamsets的...
1、介绍: StreamSets数据操作平台是唯一旨在简化如何构建,执行和操作企业数据流的平台。构建在开源核心上,开发人员可以轻松构建批处理和流式数据流,而且代码少,而运营商使用云本地产品将数十或数百...
实例百度网盘下载(永久有效):
链接: https://pan.baidu.com/s/1ltXeOlMEMYyfJD6eplUg1A 密码: 3ii5
实例下载:
streamsets使...
最近在研究StreamSets,因为它官网的标题就是处理复杂数据流,就想试一下,做了几个简单Demo之后,发现从传统关系型数据库到Hbase貌似经过很简单的操作就可以做到实时的数据采集:
一、环境说明开发环境需要如下软件:smartgit 17.0.4
下载地址:http://www.syntevo.com/smartgit/welcomeapache-ant-1.10.0-b...
1、支持多种安装方式1.1、核心安装包(Core Tarball)该安装包包含核心的SDC软件,使该软件具有最小的软件连接器集合,当然你可以手动下载额外的节点(Stage)①通过Streamsets的...
什么是StreamSets数据收集器?StreamSets 数据收集器是一个轻量级,强大的引擎,实时流数据。使用Data Collector在数据流中路由和处理数据。要为Data Collector定...
streamsets在编译时的那些坑1 问题描述commandLine 'docker', 'manifest', 'inspect', 'alpine'& Configure proj...
Streamsets相关资料汇总 1、Streamsets官网介绍https://streamsets.com/ Github:https://github.com/search?utf8=%E2%9...
没有更多推荐了,centos7通过yum安装mysql,并授权远程连接 - 荣锋亮 - 博客园
随笔 - 876, 文章 - 175, 评论 - 28, 引用 - 0
&安装:CentOS 7的yum源中没有正常安装的mysql-sever文件,需要去官网上下载(通过安装mysql的yum容器,再通过yum安装mysql)注:安装前,需要卸载所有的mariadb软件(完全兼容mysql的另一个数据库,mysql原创者所写),通过命令yum remove mariadb*获取yum地址:安装mysql的yum容器:其实就是在/etc/yum.repo.d/ &下添加了两个容器档案:& & 查看那个yum容器有什么mysql软件(yum install mysql-这里再按两次Tab键,注:使用yum时,yum 会先下载容器的清单到本机的 /var/cache/yum 里面去,但mysql的容器是刚添加的,/var/cache/yum 里面还没下载,所以yum install mysql-这里再按两次Tab键,应该不会出现下面画面,所以可以先输入:yum list mysql-community-client,加载容器清单到/var/cache/yum,在输入yum install mysql-这里再按两次Tab键):我这里安装了这些:安装完后启动mysql:设置密码:授权远程访问:登陆:使用mysql数据库(真正的数据库,而非数据库软件),将所有数据库的所有表(*.*)的所有权限(all privileges),授予通过任何ip(%)访问的root用户,密码为123456,最后刷新(flush privileges)即可。开放防火墙端口:通过vim修改/etc/sysconfig/iptables,添加一行(这里是为了简单添加一行,更多防火墙知识请自行学习):重启防火墙:在windows下,我用 navicat测试:远程连接成功。import java.sql.{PreparedStatement, ResultSet}
import com.alibaba.fastjson.JSON
import kafka.serializer.StringDecoder
import org.apache.log4j.Logger
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import we.com.util.{KafkaManager, MysqlPoolUtils}
* @author yangxin
* 解析数据库binLog日志到Mysql数据库
* 基本原理:
* 1.读取日志解析对应的数据表的Log操作,先正则每条日志,找到匹配到的原始日志
* 2.根据判断表中的主键或者关键字段选择是否解析或者跳过当前的日志
* 3.将匹配到包含关键字段的日志数据,获取对数据操作的方式insert、update
* 4.然后再去获取每个字段的含有,再将要执行的SQL语句加到批处理
* 5.执行批处理,同步数据到数据库
object BinLogToMysql {
private val appName = "BinLogToMysqlService"
private val LOG = Logger.getLogger(appName)
// SparkStreaming
private val master = "yarn"
// 数据表的正则表达式
private val accountRegex = """(.*"tableName":"account".*)""".r
// 处理RDD操作
def processRdd(rdd: RDD[(String, String)]): Unit = {
rdd.foreachPartition {
partition =& {
if (partition.nonEmpty) {
val conn = MysqlPoolUtils.getConnection.get
val rs: ResultSet = null
var insertSQL = "INSERT INTO rt_account(fundAcc, is_lender, idcard_info_id) VALUES (?,?,?)"
var updateSQL = "update ...."
val stmt1 = conn.prepareStatement(insertSQL)
val stmt2 = conn.prepareStatement(updateSQL)
stmt1.addBatch()
partition.foreach { line =&
val lineVal = line._2
lineVal match {
case accountRegex(accountJson) =& {
val accountObj = JSON.parse(accountJson)
accountObj match {
// 匹配主要关键的字段、主键是否存在
case Some(mapStr: Map[String, Any]@unchecked) if mapStr.contains("id") && mapStr.contains("version") =& {
val pointEventType = mapStr.getOrElse("dataEventType", "").toString
if (null != pointEventType && "insert".equals(pointEventType)) {
// addAccountData(stmt1, mapStr)
// 解析mapStr JSON字符串,减将对应的key的value放到对应SQL中的位置
stmt1.addBatch()
} else if (null != pointEventType && "update".equals(pointEventType)) {
// addAccountData(stmt2, mapStr)
// 解析mapStr JSON字符串,减将对应的key的value放到对应SQL中的位置
stmt2.addBatch()
case other =& {
LOG.error("Data Struct Error")
case None =& {
LOG.error("DataParser failded")
stmt1.executeBatch()
stmt2.executeBatch()
// 关闭数据库连接操作
def close(resultSet: ResultSet, stmts: PreparedStatement*): Unit = {
for (stmt &- stmts) {
if (resultSet != null)
resultSet.close()
if (stmt != null)
stmt.close()
// 程序的run方法
def run(ssc: StreamingContext, brokers: String, topics: String): Unit = {
val kafkaParams = Map[String, String](
"metadata.broker.list" -& brokers,
"group.id" -& "BinLogToMysqlService",
"auto.offset.reset" -& "offsetStrategies"
val topicSet = topics.split(",").toSet
val kafkaManager = new KafkaManager(kafkaParams)
val messagesStream = kafkaManager.createDirectStream[String, String, StringDecoder, StringDecoder](
ssc, kafkaParams, topicSet
messagesStream.asInstanceOf[InputDStream[(String, String)]].foreachRDD(
processRdd(rdd)
kafkaManager.updateZKOffsets(rdd)
// 程序的主入口Main方法
def main(args: Array[String]): Unit = {
if (args.length & 2) {
LOG.error(
s"""Usage: KafkaBinLogToMysql &brokers& &topics&
&brokers& is a list of one or more kafka brokers
&topics& is a list of one or more kafka topics split by "," to consume
""".stripMargin)
System.exit(1)
// 开启Spark自动使用系统负载选择最优消费速率
val sc = new SparkConf().set("spark.streaming.backpressure.enabled", "true").setAppName(appName).setMaster(master)
// 设置每批处理数据时间间隔
val ssc = new StreamingContext(sc, Seconds(10))
// 接受的brokers,topics参数
val Array(brokers, topics) = args
run(ssc, brokers, topics)
// 开始SparkStreaming实时程序
ssc.start()
// 设置自动的停止,保证数据接受完全
val checkInterValMillisc = 20000
var isStopped:Boolean = false
while (!isStopped) {
isStopped = ssc.awaitTerminationOrTimeout(checkInterValMillisc)
if (!isStopped) {
ssc.stop(stopSparkContext=true, stopGracefully=true)
kafka-&spark-&streaming-&mysql(scala)实时数据处理示例
由于业务的发展,一些实时统计的需求越来越多。有些东西通过记录日志然后实时分析日志可以解决。但是对于有入库还有自己记录到日志的这显然是多此一举。因为MySQL本身就有帮你记录日志, 而且记录...
本文采用Maxwell来实现实时解析mysql的binlog日志发送至kafka
1、开启mysql binlog
环境中mysql是docker容器,所以需要进入容器修改mysql配置....
在双十一这样的节日,很多电商都会在大屏幕上显示实时的订单总量和GMV总额。由于订单数量巨大,不可能每隔一秒就到数据库里进行一次SQL的数据统计,这时候就需要用到流式计算。本文将介绍一个...
本例子是我初尝 spark 的sparkStreaming官方小例子修改的。我的思路是使用jdbc 链接数据库,然后查询数据库,将查询结果生成一个RDD ,放入RDD queue,然后每次取出rdd ...
本文的列子来自http://blog.csdn.net/zfszhangyuan/article/details/,部分内容做了相应的修改和添加
首先我们要做一个日志生产器,...
业务架构:
JavaScript -& Netty -& Kafka -& Spark Streaming + Hive -& Redis -& PHP
1.JavaScript作为统计脚本发送后...
sparkStream实际上就是为实时操作生成的数据提供服务的。
下面给大家介绍:通过监控虚拟机9999端口,当那边输入单词时,这边会对它进行实时的一个单词计数,并将结果存入到hdfs.
一,过程...
现在准备做mysql实时同步数据到kudu,为以后的实时即席查询分析做数据支撑,kudu+impala速度还是挺快的。
因为实时性要求比较高,而且需要同步的时候对mysql的压力不能太大,不然会影响...
没有更多推荐了,摘要: 由于公司业务上的需求,需要实时监控mysql数据库的数据的增长,并将数据同步到另一个平台,所以就问老大使用什么工具比较好,老大推荐使用StreamSets,还说在测试环境都已经部署好了StreamSets,所以就开始写了第一个测试。 数据库版本:5.6.35 MySQL Community Serv
18:49 猿起缘灭 阅读(22) 评论(0)
摘要: 由于公司业务上的需求,需要实时监控mysql数据库的数据的增长,并将数据同步到另一个平台,所以就问老大使用什么工具比较好,老大推荐使用StreamSets,还说在测试环境都已经部署好了StreamSets,所以就开始写了第一个测试。 数据库版本:5.6.35 MySQL Community Serv
18:15 猿起缘灭 阅读(2) 评论(0) &随笔分类 - StreamSet

我要回帖

更多关于 streamsets多表导入 的文章

 

随机推荐