redisredis cli pipelinee 是做什么用的

刚刚接触scrapy,想让scrapy实现分布式爬取,发现还有个东西叫做scrapy-redis,请问二者却别是什么
不妖自来~我刚刚接触scrapy的时候,也看过这个项目,奈何对scrapy本身就不怎么熟悉,所以当时怎么也想不明白,直到后来开始看scrapy 的源代码,才渐渐明白。这里提一下我的看法,水平有限,不敢保证完全正确,欢迎指正。一、scrapy和scrapy-redis的主要区别在哪里?个人认为,scrapy和scrapy-redis不应该讨论区别。scrapy 是一个通用的爬虫框架,其功能比较完善,可以帮你迅速的写一个简单爬虫,并且跑起来。scrapy-redis是为了更方便地实现scrapy分布式爬取,而提供了一些以redis为基础的组件(注意,scrapy-redis只是一些组件,而不是一个完整的框架)。你可以这么认为,scrapy是一工厂,能够出产你要的spider。而scrapy-redis是其他厂商为了帮助scrapy工厂更好的实现某些功能而制造了一些设备,用于替换scrapy工厂的原设备。所以要想跑分布式,先让scrapy工厂搭建起来,再用scrapy-redis设备去更换scrapy的某些设备。那么这些scrapy-redis组件有什么突出特点呢?他们使用了redis数据库来替换scrapy原本使用的队列结构(deque),换了数据结构,那么相应的操作当然都要换啦,所以与队列相关的这些组件都做了更换。二、scrapy-redis提供了哪些组件?Scheduler、Dupefilter、Pipeline、Spider提供了哪些组件具体见。三、为什么要提供这些组件?这要从哪哪哪说起(喝口水,默默地望着远方......)我们先从scrapy的“待爬队列”和“Scheduler”入手:咱们玩过爬虫(什么玩过,是学习过,研究过,爱过也被虐过)的同学都多多少少有些了解,在爬虫爬取过程当中,有一个主要的数据结构是“待爬队列”,以及能够操作这个队列的调度器(也就是Scheduler啦)。scrapy官方文档对这二者的描述不多,基本上没提。scrapy使用什么样的数据结构来存放待爬取的request呢?其实没用高大上的数据结构,就是python自带的collection.deque,不过当然是改造过后的啦(后面所说到的deque均是指scrapy改造之后的队列,至于如何改造的,就去看代码吧)。详见源代码不过咱们用一用deque就会意识到,该怎么让两个以上的Spider共用这个deque呢?答案是,我水平不够,不知道。那分布式怎么实现呢,待爬队列都不能共享,还玩个泥巴呀。scrapy-redis提供了一个解决方法,把deque换成redis数据库,我们从同一个redis服务器存放要爬取的request,这样就能让多个spider去同一个数据库里读取,这样分布式的主要问题就解决了嘛。---------------------------------------------------分割线的随地大小便----------------------------------------------------------那么问题又来了,我们换了redis来存放队列,哪scrapy就能直接分布式了么?(当初天真的我呀~当然不能。我们接着往下说。scrapy中跟“待爬队列”直接相关的就是调度器“Scheduler”:,它负责对新的request进行入列操作(加入deque),取出下一个要爬取的request(从deque中取出)等操作。在scrapy中,Scheduler并不是直接就把deque拿来就粗暴的使用了,而且提供了一个比较高级的组织方法,它把待爬队列按照优先级建立了一个字典结构,比如:{priority0:队列0priority1:队列2priority2:队列2}然后根据request中的priority属性,来决定该入哪个队列。而出列时,则按priority较小的优先出列。为了管理这个比较高级的队列字典,Scheduler需要提供一系列的方法。说这么多有什么意义呢?最主要的指导意义就是:你要是换了redis做队列,这个scrapy下的Scheduler就用不了,所以自己写一个吧。于是就出现了scrapy-redis的专用scheduler:,其实可以对比一下看,操作什么的都差不太多。主要是操作的数据结构变了。----------------------------------------------被当众抓住的分割线---------------------------------------------------------那么既然使用了redis做主要数据结构,能不能把其他使用自带数据结构关键功能模块也换掉呢?(关键部分使用自带数据结构简直有种没穿小内内的危机感,这是本人的想法~)在我们爬取过程当中,还有一个重要的功能模块,就是request去重(已经发送过得请求就别再发啦,也要考虑一下服务器君的感受好伐)。scrapy中是如何实现这个去重功能的呢?用集合~scrapy中把已经发送的request指纹放入到一个集合中,把下一个request的指纹拿到集合中比对,如果该指纹存在于集合中,说明这个request发送过了,如果没有则继续操作。这个核心的判重功能是这样实现的:def request_seen(self, request):
#self.figerprints就是一个指纹集合
fp = self.request_fingerprint(request)
if fp in self.fingerprints:#这就是判重的核心操作。
return True
self.fingerprints.add(fp)
详见源代码:为了分布式,把这个集合也换掉吧,换了redis,照样也得把去重类给换了。于是就有了scrapy-redis的dupefilter:------------------------------------------把分割线掀起来(╯‵□′)╯︵||||||\\\\\\\\\\\\\------------------------------------那么依次类推,接下来的其他组件(Pipeline和Spider),我们也可以轻松的猜到,他们是为什么要被修改呢。对,都是因为redis,全怪redis,redis是罪魁祸首。(其实我不知道,我只是在凑字数而已)以上是我个人见解,不代表权威说法。希望对你有帮助。如有纰漏请指正(但是别打我
已有帐号?
无法登录?
社交帐号登录
做饭很好吃的程序员,一定会是个很好的金融分析师。redis 能做什么 - Redis专题 - 小象学院 - 中国最专业的Hadoop,Spark大数据在线教育平台——权威课程:Hadoop培训,Spark培训,HBase培训,Hive培训,Mahout培训等 - Powered By EduSoho
创建时间:
1521 次查看
Redis本身是一个cs模式的tcp server, client可以通过一个socket连续发起多个请求命令。 每个请求命令发出后client通常会阻塞并等待redis服务端处理,redis服务端处理完后将结果返回给client。
redis的pipeline(管道)功能在命令行中没有,但redis是支持pipeline的,而且在各个语言版的client中都有相应的实现。 由于网络开销延迟,即算redis server端有很强的处理能力,也由于收到的client消息少,而造成吞吐量小。当client 使用pipelining 发送命令时,redis server必须部分请求放到队列中(使用内存)执行完毕后一次性发送结果;如果发送的命名很多的话,建议对返回的结果加标签,当然这也会增加使用的内存;
Pipeline在某些场景下非常有用,比如有多个command需要被“及时的”提交,而且他们对相应结果没有互相依赖,而且对结果响应也无需立即获得,那么pipeline就可以充当这种“批处理”的工具;而且在一定程度上,可以较大的提升性能,性能提升的原因主要是TCP链接中较少了“交互往返”的时间。不过在编码时请注意,pipeline期间将“独占”链接,此期间将不能进行非“管道”类型的其他操作,直到pipeline关闭;如果你的pipeline的指令集很庞大,为了不干扰链接中的其他操作,你可以为pipeline操作新建Client链接,让pipeline和其他正常操作分离在2个client中。不过pipeline事实上所能容忍的操作个数,和socket-output缓冲区大小/返回结果的数据尺寸都有很大的关系;同时也意味着每个redis-server同时所能支撑的pipeline链接的个数,也是有限的,这将受限于server的物理内存或网络接口的缓冲能力。
你还没有登录,请先或!
(大小不能超过2MB,文件类型支持png ,jpg ,gif ,doc ,xls ,txt ,rar ,zip .)<!DOCTYPE HTML PUBLIC "-//W3C//DTD HTML 4.01 Transitional//EN"
您的访问请求被拒绝 403 Forbidden - ITeye技术社区
您的访问请求被拒绝
亲爱的会员,您的IP地址所在网段被ITeye拒绝服务,这可能是以下两种情况导致:
一、您所在的网段内有网络爬虫大量抓取ITeye网页,为保证其他人流畅的访问ITeye,该网段被ITeye拒绝
二、您通过某个代理服务器访问ITeye网站,该代理服务器被网络爬虫利用,大量抓取ITeye网页
请您点击按钮解除封锁&trackbacks-0
创建日期:日
修改日期:日
前段时间细节的了解了Jedis的使用,Jedis是redis的java版本的客户端实现。
本文做个总结,主要分享如下内容:
【pipeline】【分布式的id生成器】【分布式锁【watch】【multi】】【redis分布式】
好了,一个一个来。
一、 Pipeline
官方的说明是:starts a pipeline,which is a very efficient way to send lots of command and read all the responses when you finish sending them。简单点说pipeline适用于批处理。当有大量的操作需要一次性执行的时候,可以用管道。
Jedis&jedis&=&new&Jedis(String,&int);
Pipeline&p&=&jedis.pipelined();
p.set(key,value);//每个操作都发送请求给redis-server
p.get(key,value);
p.sync();//这段代码获取所有的response
这里我进行了20w次连续操作(10w读,10w写),不用pipeline耗时:187242ms,用pipeline耗时:1188ms,可见使用管道后的性能上了一个台阶。看了代码了解到,管道通过一次性写入请求,然后一次性读取响应。也就是说jedis是:request response,request response,...;pipeline则是:request request... response response的方式。这样无需每次请求都等待server端的响应。
二、&跨jvm的id生成器&
谈到这个话题,首先要知道redis-server端是单线程来处理client端的请求的。
这样来实现一个id生成器就非常简单了,只要简单的调用jdeis.incr(key);就搞定了。
你或许会问,incr是原子操作吗,能保证不会出现并发问题吗,前面说过,server端是单线程处理请求的。
三、&【跨jvm的锁实现【watch】【multi】】
首先说下这个问题的使用场景,有些时候我们业务逻辑是在不同的jvm进程甚至是不同的物理机上的jvm处理的。这样如何来实现不同jvm上的同步问题呢,其实我们可以基于redis来实现一个锁。具体事务和监听请参考文章:&暂时找到三种实现方式:
1. 通过jedis.setnx(key,value)实现
& &&&import&java.util.Rimport&mons.pool.impl.GenericObjectPool.Cimport&redis.clients.jedis.Jimport&redis.clients.jedis.JedisPimport&redis.clients.jedis.T/**&*&&#64;author&Teaey&*/public&class&RedisLock&{&&&&//加锁标志&&&&public&static&final&String&LOCKED&=&"TRUE";&&&&public&static&final&long&ONE_MILLI_NANOS&=&1000000L;&&&&//默认超时时间(毫秒)&&&&public&static&final&long&DEFAULT_TIME_OUT&=&3000;&&&&public&static&JedisPool&&&&&public&static&final&Random&r&=&new&Random();&&&&//锁的超时时间(秒),过期删除&&&&public&static&final&int&EXPIRE&=&5&*&60;&&&&static&{&&&&&&&&pool&=&new&JedisPool(new&Config(),&"host",&6379);&&&&}&&&&private&Jedis&&&&&private&String&&&&&//锁状态标志&&&&private&boolean&locked&=&false;&&&&public&RedisLock(String&key)&{&&&&&&&&this.key&=&&&&&&&&&this.jedis&=&pool.getResource();&&&&}&&&&public&boolean&lock(long&timeout)&{&&&&&&&&long&nano&=&System.nanoTime();&&&&&&&&timeout&*=&ONE_MILLI_NANOS;&&&&&&&&try&{&&&&&&&&&&&&while&((System.nanoTime()&-&nano)&&&timeout)&{&&&&&&&&&&&&&&&&if&(jedis.setnx(key,&LOCKED)&==&1)&{&&&&&&&&&&&&&&&&&&&&jedis.expire(key,&EXPIRE);&&&&&&&&&&&&&&&&&&&&locked&=&true;&&&&&&&&&&&&&&&&&&&&return&&&&&&&&&&&&&&&&&}&&&&&&&&&&&&&&&&//&短暂休眠,nano避免出现活锁&&&&&&&&&&&&&&&&Thread.sleep(3,&r.nextInt(500));&&&&&&&&&&&&}&&&&&&&&}&catch&(Exception&e)&{&&&&&&&&}&&&&&&&&return&false;&&&&}&&&&public&boolean&lock()&{&&&&&&&&return&lock(DEFAULT_TIME_OUT);&&&&}&&&&//&无论是否加锁成功,必须调用&&&&public&void&unlock()&{&&&&&&&&try&{&&&&&&&&&&&&if&(locked)&&&&&&&&&&&&&&&&jedis.del(key);&&&&&&&&}&finally&{&&&&&&&&&&&&pool.returnResource(jedis);&&&&&&&&}&&&&}}
2. 通过事务(multi)实现由于采纳第一张方法,第二种跟第三种实现只贴了关键代码,望谅解。^_^& & &public&boolean&lock_2(long&timeout)&{&&&&&&&&long&nano&=&System.nanoTime();&&&&&&&&timeout&*=&ONE_MILLI_NANOS;&&&&&&&&try&{&&&&&&&&&&&&while&((System.nanoTime()&-&nano)&&&timeout)&{&&&&&&&&&&&&&&&&Transaction&t&=&jedis.multi();&&&&&&&&&&&&&&&&//&开启事务,当server端收到multi指令&&&&&&&&&&&&&&&&//&会将该client的命令放入一个队列,然后依次执行,知道收到exec指令&&&&&&&&&&&&&&&&t.getSet(key,&LOCKED);&&&&&&&&&&&&&&&&t.expire(key,&EXPIRE);&&&&&&&&&&&&&&&&String&ret&=&(String)&t.exec().get(0);&&&&&&&&&&&&&&&&if&(ret&==&null&||&ret.equals("UNLOCK"))&{&&&&&&&&&&&&&&&&&&&&return&true;&&&&&&&&&&&&&&&&}&&&&&&&&&&&&&&&&//&短暂休眠,nano避免出现活锁&&&&&&&&&&&&&&&&Thread.sleep(3,&r.nextInt(500));&&&&&&&&&&&&}&&&&&&&&}&catch&(Exception&e)&{&&&&&&&&}&&&&&&&&return&false;&&&&}
3. 通过事务+监听实现& &&public&boolean&lock_3(long&timeout)&{&&&&&&&&long&nano&=&System.nanoTime();&&&&&&&&timeout&*=&ONE_MILLI_NANOS;&&&&&&&&try&{&&&&&&&&&&&&while&((System.nanoTime()&-&nano)&&&timeout)&{&&&&&&&&&&&&&&&&jedis.watch(key);&&&&&&&&&&&&&&&&//&开启watch之后,如果key的值被修改,则事务失败,exec方法返回null&&&&&&&&&&&&&&&&String&value&=&jedis.get(key);&&&&&&&&&&&&&&&&if&(value&==&null&||&value.equals("UNLOCK"))&{&&&&&&&&&&&&&&&&&&&&Transaction&t&=&jedis.multi();&&&&&&&&&&&&&&&&&&&&t.setex(key,&EXPIRE,&LOCKED);&&&&&&&&&&&&&&&&&&&&if&(t.exec()&!=&null)&{&&&&&&&&&&&&&&&&&&&&&&&&return&true;&&&&&&&&&&&&&&&&&&&&}&&&&&&&&&&&&&&&&}&&&&&&&&&&&&&&&&jedis.unwatch();&&&&&&&&&&&&&&&&//&短暂休眠,nano避免出现活锁&&&&&&&&&&&&&&&&Thread.sleep(3,&r.nextInt(500));&&&&&&&&&&&&}&&&&&&&&}&catch&(Exception&e)&{&&&&&&&&}&&&&&&&&return&false;&&&&}
最终采用第一种实现,因为加锁只需发送一个请求,效率最高。四、&【redis分布式】& & 最后一个话题,jedis的分布式。在jedis的源码里发现了两种hash算法(MD5,MURMUR Hash(默认)),也可以自己实现redis.clients.util.Hashing接口扩展。& &&List&JedisShardInfo&&hosts&=&new&ArrayList&JedisShardInfo&();&&&&&&&&//server1&&&&&&&&JedisShardInfo&host1&=&new&JedisShardInfo("",&);&&&&&&&&//server2&&&&&&&&JedisShardInfo&host2&=&new&JedisShardInfo("",&);&&&&&&&&hosts.add(host1);&&&&&&&&hosts.add(host2);&&&&&&&&ShardedJedis&jedis&=&new&ShardedJedis(hosts);&&&&&&&&jedis.set("key",&"");另外写博客真费力。。。
阅读(14011)
&re: Jedis使用总结【pipeline】【分布式的id生成器】【分布式锁【watch】【multi】】【redis分布式】
2.6之后,支持了script,又有新的办法了&&&&&&
&re: Jedis使用总结【pipeline】【分布式的id生成器】【分布式锁【watch】【multi】】【redis分布式】
没时间研究了,如果有新的,可以完善本文&#64;finallygo&&&&&&
&re: Jedis使用总结【pipeline】【分布式的id生成器】【分布式锁【watch】【multi】】【redis分布式】
请问博主,为什么我用了pipeline之后,设置10万条数据,程序反应不过来,倒是不用pipeline,程序还能正常运行呢?&&&&&&
&re: Jedis使用总结【pipeline】【分布式的id生成器】【分布式锁【watch】【multi】】【redis分布式】[未登录]
&#64;leealways887数据量太大了吧
内存用完了&&&&&&
&re: Jedis使用总结【pipeline】【分布式的id生成器】【分布式锁【watch】【multi】】【redis分布式】
什么叫反应不过来!据我说之,redis服务器应该不会反应不过来。&#64;leealways887&&&&&&
&re: Jedis使用总结【pipeline】【分布式的id生成器】【分布式锁【watch】【multi】】【redis分布式】
第二种的t.expire(key, EXPIRE);有问题吧 ,如果一直有锁请求,岂不是每次都把这个过期时间重新设置了?应该先判断下有没有超时,没有就设置,有的话就不设置。&&&&&&
阅读排行榜
评论排行榜

我要回帖

更多关于 redis pipeline get 的文章

 

随机推荐