请教kafka 启用多台kafka 0.9 consumerr 问题

kafka topic 问题
各位为好:
就是kafka将topic分为多个区,然后将区分布在多个sever上;如果将10个topic,每个topic分1个区。那么这个topic的10个区会分布在不同的server上吗?还是会集中在1,2台server上?如果分为多个区的话,我知道是会均匀分布的;
另外每个区是在那个server上怎么知道?owner 只是告诉了是消费者在消费当前分区,不知道当前分区在那个server上。
我也在研究这个 有空一起研究下
对KAFKA高可用表示怀疑
各个参数的含义:
1 &--zk: zookeeper的地址
2 &--prot 端口号
3 &--refresh 刷新频率,更新到DB。
4 &--retain 保留DB的时间
5 & 设置dbName在哪里存储记录(默认'offsetapp')
更详细的你可以看这篇文章呀:/54
Spark+Kafka实时流机器学习实战
课程观看地址:
课程出自学途无忧网:
一、课程使用到的软件及版本
①Spark1.6.2
②kafka0.8.2.1
③centos6.5
二、课程适合人群
适合想学kafka,Spark实时流计算,Spark机器学习的学员
三、课程目标
①学好Kafka,及其整个架构实现原理
②熟练运用Spark机器学习
③熟练掌控Spark与Kafka结合,实现实时流计算
四、课程目录
第1课、spark与kafka的介绍
第2课、spark的集群安装
第3课、Spark RDD函数讲解与实战分析
第4课、Spark 的java操作实现简单程序
第5课、SparkRDD原理详细剖析
第6课、Spark 机器学习,API阅读
第7课、Kafka架构介绍以及集群安装
第8课、Kafka生产者Producer的实战
第9课、Kafka消费者Consumer剖析与实战
第10课、Kafka复杂消费者的详细讲解
第11课、Kafka数据安全,以及Spark Kafka Streaming API
第12课、Spark+Kafka+Mysql整合
第13课、Spark 机器学习ALS设计
第14课、Spark ALS协同过滤java实战
第15课、Spark ALS给用户推荐产品
第16课、Spark机器学习后存储到Mysql
第17课、Spark读取Kafka流构建Als模型
第18课、Spark处理Kafka流构建Als模型&
第19课、Spark处理Kafka流实现实时推荐算法
第20课、Spark学习经验总结,spark2与spark1的区别,下期预告
推荐组合学习:《深入浅出Spark机器学习实战(用户行为分析)》
课程观看地址:求教各位:
我有一个topic,两个Topology,都需要消费该topic的数据,好像需要设置不同的consumer group才能支持,
请问程序中如何设置参数才能支持呢
主题帖子积分
高级会员, 积分 2029, 距离下一级还需 2971 积分
高级会员, 积分 2029, 距离下一级还需 2971 积分
&&Consumers
& & 本质上kafka只支持Topic.每个consumer属于一个反过来说,每个group中可以有多个consumer.发送到Topic的消息,只会被订阅此Topic的每个group中的一个consumer消费.
& & 如果所有的consumer都具有相同的group,这种情况和queue模式很像;消息将会在consumers之间负载均衡.
& & 如果所有的consumer都具有不同的group,那这就是&发布-订阅&;消息将会广播给所有的消费者.
& & 在kafka中,一个partition中的消息只会被group中的一个consumer消费;每个group中consumer消息消费互相独立;我们可以认为一个group是一个&订阅&者,一个Topic中的每个partions,只会被一个&订阅者&中的一个consumer消费,不过一个consumer可以消费多个partitions中的消息.kafka只能保证一个partition中的消息被某个consumer消费时,消息是顺序的.事实上,从Topic角度来说,消息仍不是有序的.
& & kafka的设计原理决定,对于一个topic,同一个group中不能有多于partitions个数的consumer同时消费,否则将意味着某些consumer将无法得到消息.
##############################################
consumer端部署及API
1、consumer.properties:文件位于/resources目录下
[Bash shell] 纯文本查看 复制代码zookeeper.connect=192.168.0.1:2181test-datacenter/test-server
# timeout in ms for connecting to zookeeper
zookeeper.connectiontimeout.ms=1000000
#consumer group id
group.id=test-group
#consumer timeout
#consumer.timeout.ms=5000
2、JAVA API实现
[Java] 纯文本查看 复制代码import java.io.UnsupportedEncodingE
import java.util.L
import java.util.P
import java.util.concurrent.TimeU
import kafka.consumer.*;
import kafka.javaapi.consumer.ConsumerC
import kafka.message.MessageAndM
import mons.collections.CollectionU
public class kafkaConsumer {
public static void main(String[] args) throws InterruptedException, UnsupportedEncodingException {
Properties properties = new Properties();
properties.put(&zookeeper.connect&, &192.168.0.1:2181/test-datacenter/test-server&);
properties.put(&mit.enable&, &true&);
properties.put(&mit.interval.ms&, &60000&);
properties.put(&group.id&, &test&);
ConsumerConfig consumerConfig = new ConsumerConfig(properties);
ConsumerConnector javaConsumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);
//topic的过滤器
Whitelist whitelist = new Whitelist(&test&);
List&KafkaStream&byte[], byte[]&& partitions = javaConsumerConnector.createMessageStreamsByFilter(whitelist);
if (CollectionUtils.isEmpty(partitions)) {
System.out.println(&empty!&);
TimeUnit.SECONDS.sleep(1);
//消费消息
for (KafkaStream&byte[], byte[]& partition : partitions) {
ConsumerIterator&byte[], byte[]& iterator = partition.iterator();
while (iterator.hasNext()) {
MessageAndMetadata&byte[], byte[]& next = iterator.next();
System.out.println(&partiton:& + next.partition());
System.out.println(&offset:& + next.offset());
System.out.println(&message:& + new String(next.message(), &utf-8&));
主题帖子积分
consumer基本配置如下:& && & group.id& && & zookeeper.connect
PropertyDefaultDescriptiongroup.id 用来唯一标识consumer进程所在组的字符串,如果设置同样的group&&id,表示这些processes都是属于同一个consumer&&groupzookeeper.connect 指定zookeeper的连接的字符串,格式是hostname:port,此处host和port都是zookeeper server的host和port,为避免某个zookeeper 机器宕机之后失联,你可以指定多个hostname:port,使用逗号作为分隔:
hostname1:port1,hostname2:port2,hostname3:port3
可以在zookeeper连接字符串中加入zookeeper的chroot路径,此路径用于存放他自己的数据,方式:
hostname1:port1,hostname2:port2,hostname3:port3/chroot/pathconsumer.idnull不需要设置,一般自动产生socket.timeout.ms30*100网络请求的超时限制。真实的超时限制是& &max.fetch.wait+socket.timeout.mssocket.receive.buffer.bytes64*1024socket用于接收网络请求的缓存大小fetch.message.max.bytes每次fetch请求中,针对每次fetch消息的最大字节数。这些字节将会督导用于每个partition的内存中,因此,此设置将会控制consumer所使用的memory大小。这个fetch请求尺寸必须至少和server允许的最大消息尺寸相等,否则,producer可能发送的消息尺寸大于consumer所能消耗的尺寸。num.consumer.fetchers1用于fetch数据的fetcher线程数<mit.enabletrue如果为真,consumer所fetch的消息的offset将会自动的同步到zookeeper。这项提交的offset将在进程挂掉时,由新的consumer使用<mit.interval.ms60*1000consumer向zookeeper提交offset的频率,单位是秒queued.max.message.chunks2用于缓存消息的最大数目,以供consumption。每个chunk必须和fetch.message.max.bytes相同rebalance.max.retries4当新的consumer加入到consumer&&group时,consumers集合试图重新平衡分配到每个consumer的partitions数目。如果consumers集合改变了,当分配正在执行时,这个重新平衡会失败并重入fetch.min.bytes1每次fetch请求时,server应该返回的最小字节数。如果没有足够的数据返回,请求会等待,直到足够的数据才会返回。fetch.wait.max.ms100如果没有足够的数据能够满足fetch.min.bytes,则此项配置是指在应答fetch请求之前,server会阻塞的最大时间。rebalance.backoff.ms2000在重试reblance之前backoff时间refresh.leader.backoff.ms200在试图确定某个partition的leader是否失去他的leader地位之前,需要等待的backoff时间auto.offset.resetlargestzookeeper中没有初始化的offset时,如果offset是以下值的回应:
smallest:自动复位offset为smallest的offset
largest:自动复位offset为largest的offset
anything&&else:向consumer抛出异常consumer.timeout.ms-1如果没有消息可用,即使等待特定的时间之后也没有,则抛出超时异常exclude.internal.topicstrue是否将内部topics的消息暴露给consumerparitition.assignment.strategyrange选择向consumer 流分配partitions的策略,可选值:range,roundrobinclient.idgroup id value是用户特定的字符串,用来在每次请求中帮助跟踪调用。它应该可以逻辑上确认产生这个请求的应用zookeeper.session.timeout.ms6000zookeeper 会话的超时限制。如果consumer在这段时间内没有向zookeeper发送心跳信息,则它会被认为挂掉了,并且reblance将会产生zookeeper.connection.timeout.ms6000客户端在建立通zookeeper连接中的最大等待时间zookeeper.sync.time.ms2000ZK follower可以落后ZK leader的最大时间offsets.storagezookeeper用于存放offsets的地点: zookeeper或者kafkaoffset.channel.backoff.ms1000重新连接offsets channel或者是重试失败的offset的fetch/commit请求的backoff时间offsets.channel.socket.timeout.ms10000当读取offset的fetch/commit请求回应的socket 超时限制。此超时限制是被consumerMetadata请求用来请求offset管理mit.max.retries5重试offset commit的次数。这个重试只应用于offset&&commits在shut-down之间。他mit.enabledtrue如果使用“kafka”作为offsets.storage,你可以二次提交offset到zookeeper(还有一次是提交到kafka)。在zookeeper-based的offset&&storage到kafka-based的offset storage迁移时,这是必须的。对任意给定的consumer&&group来说,比较安全的建议是当完成迁移之后就关闭这个选项partition.assignment.strategyrange在“range”和“roundrobin”策略之间选择一种作为分配partitions给consumer 数据流的策略; 循环的partition分配器分配所有可用的partitions以及所有可用consumer&&线程。它会将partition循环的分配到consumer线程上。如果所有consumer实例的订阅都是确定的,则partitions的划分是确定的分布。循环分配策略只有在以下条件满足时才可以:(1)每个topic在每个consumer实力上都有同样数量的数据流。(2)订阅的topic的集合对于consumer&&group中每个consumer实例来说都是确定的。
更多细节可以查看&&scala类:&&kafka.consumer.ConsumerConfig
主题帖子积分
中级会员, 积分 497, 距离下一级还需 503 积分
中级会员, 积分 497, 距离下一级还需 503 积分
多谢楼上两位,
我现在用的java,我想了解的是:
两个Topology都是通过kafkaSpout从kafka抓取数据,都是作为一个Consumer,请问如何设置,
才能同时消费Topic,有实际的例子吗
主题帖子积分
高级会员, 积分 2672, 距离下一级还需 2328 积分
高级会员, 积分 2672, 距离下一级还需 2328 积分
多谢楼上两位,
我现在用的java,我想了解的是:
两个Topology都是通过kafkaSpout从kafka抓 ...
你这个问题,别人都不会碰到的,都是运用已有的知识,然后进行尝试
group.id用来唯一标识consumer进程所在组的字符串,如果设置同样的group&&id,表示这些processes都是属于同一个consumer&&group
上面比较很清楚了,你设置一下不同group.id试一下
积极上进,爱好学习
经常参与各类话题的讨论,发帖内容较有主见
经常帮助其他会员答疑
站长推荐 /4
云计算hadoop视频大全(新增 yarn、flume|storm、hadoop一套视频
等待验证会员请验证邮箱
新手获取积分方法
技术类问答,解决学习openstack,hadoop生态系统中遇到的问题
Powered by请教kafka 启用多台consumer 问题?
你好,想跟你请教个问题://多线程方式 && &public static void consumer(){ && &&&& Properties props = new Properties(); & && &&&& props.put(&zk.connect&, &hadoop-2:2181&); & && &&&& props.put(&zk.connectiontimeout.ms&, &1000000&); & && &&&& props.put(&groupid&, &fans_group&); & && &&&&& & && &&&& // Create the connection to the cluster & && &&&& ConsumerConfig consumerConfig = new ConsumerConfig(props); & && &&&& ConsumerConnector consumerConnector = Consumer.createJavaConsumerConnector(consumerConfig); & && &&&&& & && &&&& Map&String, Integer& map = new HashMap&String, Integer&(); && &&& &map.put(&fans&, 1); && &&& & && &&&& // create 4 partitions of the stream for topic “test”, to allow 4 threads to consume & && &&&& Map&String, List&KafkaStream&Message&&& topicMessageStreams = consumerConnector.createMessageStreams(map); & && &&&& List&KafkaStream&Message&& streams = topicMessageStreams.get(&fans&); & && &&&&& & && &&&& // create list of 4 threads to consume from each of the partitions& & && &&&& ExecutorService executor = Executors.newFixedThreadPool(1); & && &&&& long startTime = System.currentTimeMillis(); && &&&& // consume the messages in the threads & && &&&& for(final KafkaStream&Message& stream: streams) { & && &&&&&& executor.submit(new Runnable() { & && &&&&&&&& public void run() { & && &&&&&&& &&& & ConsumerIterator&Message& it = stream.iterator(); && &&&&&&&&&&&&&& while (it.hasNext()){ && &&&&&&&&&&& &&& && log.debug(byteBufferToString(it.next().message().payload())); && &&&&&&&&&&&&&& } && &&&&&&&&&& }
&& &&&&&&& & && &&&&&& });
&& &&&&&& log.debug(&use time=&+(System.currentTimeMillis()-startTime)); && &&&& } & && &}
/*** && & * ByteBuffer转换为String && & * @param buffer && & *
&& & */ && &public static String byteBufferToString(ByteBuffer buffer) { && &&& &CharBuffer charBuffer = && &&& &try { && &&& &&& &Charset charset = Charset.forName(&UTF-8&); && &&& &&& &CharsetDecoder decoder = charset.newDecoder(); && &&& &&& &charBuffer = decoder.decode(buffer); && &&& &&& &buffer.flip(); && &&& &&& &return charBuffer.toString(); && &&& &} catch (Exception ex) { && &&& &&& &ex.printStackTrace(); && &&& &&& & && &&& &} && &}
这是我的代码。单台没有问题,多台时ConsumerIterator&Message& it = stream.iterator();就这里等待了。难道启多台groupid要不一样?那如果我要取同一个topic中的数据怎么办?并且我没有分区push数据的情况下。
consumer消费的时候 &如果已经消费过的数据则不再消费,这个信息都是保存在consumer端的,所以如果你对一批数据要多次消费是要用不同的groupid的 &存的时候 根据不同的partition去存储 去的时候就可以针对同一个topic下的不同partition并行执行
同问啊,天杀的High Level Consumer取不到数据啊,上帝。。。。。。
N个consumer,就设置partitons的个数为N个,这样kafka会帮你分配。Kafka持久化至Hive,目前搜到大致有如下几种方案:
Kafka持久化至Hive,目前搜到大致有如下几种方案:
1、HiveKa&:&Apache&Hive's&storage&handler&that&adds&support&in&Apache&Hive&to&query&data&from&Apache&Kafka
/HiveKa/HiveKa
2、Confluent&Platform&-&HDFS&Connector
/post/kafka/kafkachi-jiu-hua-shu-ju-dao-hdfsde-fang-fa
http://docs.confluent.io/2.0.0/connect/connect-hdfs/docs/index.html
3、camus或gobblin
/thread-.html
请问下&大家平时用那个比较方便?
mq中的内容是随时变化的,向hive中导入不合适,不如从数据源直接使用sqoop导入。

我要回帖

更多关于 kafka 0.9 consumer 的文章

 

随机推荐