redis pipeline 作用是做什么用的

pika导致redis客户端使用pipeline卡死的问题排查_360基础架构组_【传送门】
pika导致redis客户端使用pipeline卡死的问题排查
360基础架构组
有人在github提issue反馈redis客户端使用pipeline往pika灌数据,当数据量稍大一些,客户端会卡死,定位了一下问题,其实原因也很简单,这里总结一下吧Pipeline什么是Pipeline?假设现在有3条命令要执行,正常的请求方式是:客户端发送第一条命令给服务端,服务端处理,返回结果给客户端,客户端发送第二条命令给服务端……以此类推,可以看到这三条命令从开始到结束,伴随消耗着3次网络的RTT(Round Trip Time),试想如果将这三条命令打包,一起发给服务端,全发出去之后再接收服务端对这三条命令执行结果的打包返回,这样RTT消耗从3次减少成1次,减少了整体命令响应延迟,提高效率Pipeline的支持主要是在客户端这里,redis官方c客户端hiredis就很好地支持了Pipeline,其实也不难,通过redisAppendCommand来追加命令到buffer,这个时候并没有真正的发送给服务端,直到调用redisGetReply的时候,才真正发送给服务端并且等待读取服务端的结果问题定位看到反馈之后,我用hiredis写了一个简单的pipeline测试程序,就是打包1000个Ping命令发送给pika,并且期待接收服务端返回的1000个Pong,一跑,没问题。。。把Ping命令的个数调大到30万,果不其然客户端卡死了,通过htop看了下pika的状态,负责接收处理请求的worker线程还是在正常的空刷epoll_wait,并且可以处理别的客户端的请求,看来pika这里“没有问题”,再看下客户端,发现客户端卡死到了write调用上,证明此时客户端的命令发送不出去,“命令少没问题,命令多会卡死到write调用上”,猜测是客户端这里的TCP发送缓冲区写满了,为什么会满呢?难道Pika端的TCP接收缓冲区也满了导致客户端TCP窗口中的报文迟迟收不到对端确认接收的ACK所以撑爆了?不会啊,如果pika端TCP接收缓冲区有消息的话在刚才htop里对应的worker线程不可能一直在空刷epoll_wait,至少有个read调用什么的,推测到这里感觉有点走不下去了。反正目前结论就是有可能pika端TCP接收缓冲区和客户端TCP发送缓冲区都满了硬着头皮接着瞎猜,因为pipeline下,客户端在没有发送完所有命令之前,是不可能去接收服务端返回的结果,所以在上面的情况下(客户端卡死在write调用上证明命令还没全部发送完)pika的TCP发送缓冲区也有可能写满,嗯,只能靠试试了,我改了下pika的代码,调大了TCP发送缓冲区到1M,居然没问题了。。。难道redis调整了这个缓冲区大小了吗?在redis代码里搜了一下SO_SNDBUF,没看到它有调整,应该用的是默认值,那为什么同样使用默认值的pika就不行了呢?该不会pika事件处理那里写的有问题?看下代码if (pfe->mask_ & EPOLLIN) {
in_conn = static_cast(iter->second);
ReadStatus getRes = in_conn->GetRequest();
in_conn->set_last_interaction(now);
log_info("now: %d, %d", now.tv_sec, now.tv_usec);
log_info("in_conn->is_reply() %d", in_conn->is_reply());
if (getRes != kReadAll && getRes != kReadHalf) {
should_close = 1;
} else if (in_conn->is_reply()) {
pink_epoll_->PinkModEvent(pfe->fd_, 0, EPOLLOUT);
}if (pfe->mask_ & EPOLLOUT) {
in_conn = static_cast(iter->second);
log_info("in work thead SendReply before");
WriteStatus write_status = in_conn->SendReply();
log_info("in work thead SendReply after");
if (write_status == kWriteAll) {
in_conn->set_is_reply(false);
pink_epoll_->PinkModEvent(pfe->fd_, 0, EPOLLIN);
} else if (write_status == kWriteHalf) {
} else if (write_status == kWriteError) {
should_close = 1;
}原来真有问题,之前认为pika一定是开始只监听fd读事件,当解析出来一条命令后执行并生成返回结果然后开始只监听fd写事件,直到结果发送给客户端后,再重新监听fd读事件。这样的逻辑在常规的客户端读写都不会有问题,不过在pipeline下,由于客户端迟迟不读服务端的返回结果(直到发送完全部命令),很容易导致pika的TCP发送缓冲区变满,按照上面的逻辑,此时pika不认为把返回结果成功发送给了客户端,所以不断地监听只fd写事件,而这个时候该fd不可写(缓冲区满),所以迟迟来不了fd可写事件,然后此时又不关注fd可读事件,不能再读取客户端发来的后续命令,导致客户端的TCP发送缓冲区也满了,产生死锁,所以卡死在write调用上。将有问题的那一行代码改成:pink_epoll_->PinkModEvent(pfe->fd_, EPOLLIN, EPOLLOUT);表示持续关注fd可读事件,问题迎刃而解。其实redis也是这么做的,一直关注fd可读事件,只在需要的时候打开fd写事件的监听。总结对微不足道的细节问题也要足够关心,尤其网络和事件这里pika开源的收获还挺多的,不少同学反映了很多在公司内部使用pika没有遇见的问题,帮助pika更好的完善点击右下角“阅读原文”,查看更多内容^^
觉得不错,分享给更多人看到
360基础架构组 微信二维码
分享这篇文章
7月23日 22:35
360基础架构组 最新文章
360基础架构组 热门文章redis pipeline 结果集有序吗_百度知道
redis pipeline 结果集有序吗Redis结合EntityFramework结合使用的操作类
Redis结合EntityFramework结合使用的操作类
[摘要:比来一段时光正在研讨redis。 种种没有懂, 种种题目。也看了N多的材料。 终究参照着 张占岭 的专客 /lori/p/3435483.html 写了一套redis取entityframework联合的操纵类。 若有甚么没有]
最近一段时间在研究redis。 &各种不懂, 各种问题。也看了N多的资料。 &
最终参照着 &张占岭 的博客 &/lori/p/3435483.html & 写了一套redis与entityframework结合的操作类。
如有什么不正确的地方,请指明。 &
using ServiceStack.R
using ServiceStack.Redis.G
using System.Collections.G
using System.L
using System.T
using System.Data.E
using System.Threading.T
using System.Linq.E
using ServiceStack.Redis.P
using ServiceStack.Redis.S
using System.Data.SqlC
namespace Redis
/// &summary&
/// 操作redis与entity framework
/// &/summary&
public class RedisRepository&TEntity& :
IDisposable
where TEntity : class
public DbC
IRedisClient redisDB;
IRedisTypedClient&TEntity& redisTypedC
IRedisList&TEntity&
public RedisRepository(DbContext context)
this.context =
redisDB = new RedisClient("192.168.2.47", 6379);//redis服务IP和端口
redisTypedClient = redisDB.As&TEntity&();
table = redisTypedClient.Lists[typeof(TEntity).Name];
#region Repository&TEntity&成员
/// &summary&
/// 添加一条数据,该操作会同时插入到mssql和redis
/// &/summary&
/// &param name="item"&数据模型&/param&
/// &returns&是否成功,成功返回1,失败返回0&/returns&
public int Insert(TEntity item)
int result = 0;
if (item != null)
context.Set&TEntity&().Add(item);
result = context.SaveChanges();
if (result & 0)
Task.Run(async () =& await AsyncAddEntity(item));//异步向redis插入数据。
/// &summary&
/// 删除单条数据操作,同时删除redis和SqlServer
/// &/summary&
/// &param name="keysValue"&删除条件&/param&
/// &returns&是否成功,成功返回1,失败返回0&/returns&
public int Delete(params object[] keysValue)
var entity = context.Set&TEntity&().Find(keysValue);
context.Set&TEntity&().Remove(entity);
int result = context.SaveChanges();
if (result & 0)
Task.Run(async () =& await AsyncDelEntity(entity));//异步删除一条数据
/// &summary&
/// 修改操作,同时修改SqlServer和redis
/// &/summary&
/// &param name="itemOld"&旧数据&/param&
/// &param name="item"&新数据&/param&
/// &returns&是否成功,成功返回1,失败返回0&/returns&
public int Update(Func&TEntity, bool& func, TEntity item)
context.Entry&TEntity&(item).State = EntityState.M
int result = context.SaveChanges();
if (result & 0)
Task.Run(async () =& await AsyncEditEntity(func, item));//异步修改redis数据。
/// &summary&
/// 批量删除数据 ,同时删除redis和SqlServer
/// &/summary&
/// &param name="func"&删除条件&/param&
/// &returns&返回删除成功的行数&/returns&
public int DeleteSelect(Func&TEntity, bool& func)
var entities = context.Set&TEntity&().Where(func);
context.Set&TEntity&().RemoveRange(entities);
int result = context.SaveChanges();
if (result & 0)
Task.Run(async () =& await AsyncDelEntity(func));//异步删除redis数据。
/// &summary&
/// 获取所有数据
/// &/summary&
/// &returns&&/returns&
public IQueryable&TEntity& GetModel()
return table.GetAll().AsQueryable();
/// &summary&
/// 获取分页数据
/// &/summary&
/// &param name="func"&查询条件&/param&
/// &param name="keySelector"&排序字段&/param&
/// &param name="pageIndex"&获取页面的页数&/param&
/// &param name="pageSize"&页面行数&/param&
/// &param name="totalPage"&返回数据行总数&/param&
/// &returns&&/returns&
public IList&TEntity& GetModel(Func&TEntity, bool& func, Func&TEntity, object& keySelector, int pageIndex, int pageSize, out int totalPage)
int startRow = (pageIndex - 1) * pageS
totalPage = table.Count();
//判断缓存中数据是否为空并且数据库内数据行数是否与缓存中行数一致,如果为空或者不一致
从数据库查询数据 异步插入到缓存中。
int dbCount = context.Set&TEntity&().Count();
if (dbCount != totalPage || totalPage == 0)
totalPage = dbC
List&TEntity& listDB = context.Set&TEntity&().AsQueryable().ToList();
Task.Run(async () =& await AsyncAddEntity(listDB));//异步向redis插入数据。
return context.Set&TEntity&().AsQueryable().Where(func).OrderBy(keySelector).Skip(startRow).Take(pageSize).ToList();
return table.GetAll().AsQueryable().Where(func).OrderBy(keySelector).Skip(startRow).Take(pageSize).ToList();
/// &summary&
/// 查询单条数据
/// &/summary&
/// &param name="func"&查询条件&/param&
/// &returns&&/returns&
public TEntity Find(Func&TEntity, bool& func)
return table.Where(func).FirstOrDefault();
/// &summary&
/// 异步插入到redis列表
/// &/summary&
/// &param name="listDB"&&/param&
/// &returns&&/returns&
private Task AsyncAddEntity(List&TEntity& listDB)
return Task.Factory.StartNew(() =&
table.RemoveAll();
listDB.ForEach(m =& redisTypedClient.AddItemToList(table, m));
redisDB.Save();
/// &summary&
异步插入到redis单条
/// &/summary&
/// &param name="entity"&&/param&
/// &returns&&/returns&
private Task AsyncAddEntity(TEntity entity)
return Task.Factory.StartNew(() =&
redisTypedClient.AddItemToList(table, entity);
redisDB.Save();
/// &summary&
/// 异步删除一条数据
/// &/summary&
/// &param name="entity"&&/param&
/// &returns&&/returns&
private Task AsyncDelEntity(TEntity entity)
return Task.Factory.StartNew(() =&
redisTypedClient.RemoveItemFromList(table, entity);
redisDB.Save();
private Task AsyncDelEntity(Func&TEntity, bool& func)
return Task.Factory.StartNew(() =&
table.GetAll().AsQueryable().Where(func).ToList().ForEach(m =& redisTypedClient.RemoveItemFromList(table, m));
redisDB.Save();
/// &summary&
/// 修改一条数据
/// &/summary&
/// &param name="func"&&/param&
/// &param name="item"&&/param&
/// &returns&&/returns&
private Task AsyncEditEntity(Func&TEntity, bool& func, TEntity item)
return Task.Factory.StartNew(() =&
redisTypedClient.RemoveItemFromList(table, Find(func));
redisTypedClient.AddItemToList(table, item);
redisDB.Save();
#endregion
#region IDisposable成员
public void Dispose()
this.ExplicitDispose();
#endregion
#region Protected Methods
/// &summary&
/// 垃圾回收
/// &/summary&
protected void ExplicitDispose()
this.Dispose(true);
GC.SuppressFinalize(this);
protected void Dispose(bool disposing)
if (disposing)//清除非托管资源
table = null;
redisTypedClient = null;
redisDB.Dispose();
#endregion
#region Finalization Constructs
/// &summary&
/// Finalizes the object.
/// &/summary&
~RedisRepository()
this.Dispose(false);
#endregion
感谢关注 Ithao123Redis频道,是专门为互联网人打造的学习交流平台,全面满足互联网人工作与学习需求,更多互联网资讯尽在 IThao123!
Laravel是一套简洁、优雅的PHP Web开发框架(PHP Web Framework)。它可以让你从面条一样杂乱的代码中解脱出来;它可以帮你构建一个完美的网络APP,而且每行代码都可以简洁、富于表达力。
Hadoop是一个由Apache基金会所开发的分布式系统基础架构。
用户可以在不了解分布式底层细节的情况下,开发分布式程序。充分利用集群的威力进行高速运算和存储。
Hadoop实现了一个分布式文件系统(Hadoop Distributed File System),简称HDFS。HDFS有高容错性的特点,并且设计用来部署在低廉的(low-cost)硬件上;而且它提供高吞吐量(high throughput)来访问应用程序的数据,适合那些有着超大数据集(large data set)的应用程序。HDFS放宽了(relax)POSIX的要求,可以以流的形式访问(streaming access)文件系统中的数据。
Hadoop的框架最核心的设计就是:HDFS和MapReduce。HDFS为海量的数据提供了存储,则MapReduce为海量的数据提供了计算。
产品设计是互联网产品经理的核心能力,一个好的产品经理一定在产品设计方面有扎实的功底,本专题将从互联网产品设计的几个方面谈谈产品设计
随着国内互联网的发展,产品经理岗位需求大幅增加,在国内,从事产品工作的大部分岗位为产品经理,其实现实中,很多从事产品工作的岗位是不能称为产品经理,主要原因是对产品经理的职责不明确,那产品经理的职责有哪些,本专题将详细介绍产品经理的主要职责
IThao123周刊  手册得知 pipeline 只是把多个redis指令一起发出去,redis并没有保证这些指定的执行是原子的;multi相当于一个redis的transaction的,保证整个操作的原子性,避免由于中途出错而导致最后产生的数据不一致。通过测试得知,pipeline方式执行效率要比其他方式高10倍左右的速度,启用multi写入要比没有开启慢一点。
  上代码,望高手指点。
set_time_limit(0);
ini_set('memory_limit','1024M');
$redis = new Redis();
$redis-&connect('127.0.0.1');
//不具备原子性 ,管道
$redis-&pipeline();
for ($i=0;$i&100000;$i++)
$redis-&set("test_{$i}",pow($i,2));
$redis-&get("test_{$i}");
$redis-&exec();
$redis-&close();
G('1','e');
$redis-&connect('127.0.0.1');
//事物具备原子性
$redis-&multi();
for ($i=0;$i&100000;$i++)
$redis-&set("test_{$i}",pow($i,2));
$redis-&get("test_{$i}");
$redis-&exec();
$redis-&close();
G('2','e');
$redis-&connect('127.0.0.1');
//事物具备原子性
for ($i=0;$i&100000;$i++)
$redis-&set("test_{$i}",pow($i,2));
$redis-&get("test_{$i}");
$redis-&close();
G('3','e');
function G($star,$end = '')
static $info = array();
if (!empty($end))
$info[$end] = microtime(true);
$sconds = $info[$end] - $info[$star];
echo $sconds,"ms&br/&";
$info[$star] =
microtime(true);
测试输出的结果:
0.037ms0.2ms0.28ms
阅读(...) 评论()

我要回帖

更多关于 redis pipeline multi 的文章

 

随机推荐