数据流 为什么需要缓冲tris buffer缓冲液

求助,安卓蓝牙数据流的问题【android吧】_百度贴吧
&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&签到排名:今日本吧第个签到,本吧因你更精彩,明天继续来努力!
本吧签到人数:0成为超级会员,使用一键签到本月漏签0次!成为超级会员,赠送8张补签卡连续签到:天&&累计签到:天超级会员单次开通12个月以上,赠送连续签到卡3张
关注:1,207,291贴子:
求助,安卓蓝牙数据流的问题收藏
各位大大好,我是java小白+安卓小白,最近遇到一个头疼的问题:我在创建蓝牙连接之后,开启新线程读取数据:(以下为线程类里的代码)Thread ReadThread=new Thread(){//开辟缓冲区bufferbyte[] buffer = new byte[1024];int k=1;int num=0;public void run(){try {while(count==0){Log.e(TAG, &等待&);count =is1.available();}while(true){Log.e(TAG, &读入数据&);Log.e(TAG, &k=&+k);k++;num = inputstream.read(buffer); //读入数据Log.e(&TAG&, &num=&+num);Log.e(&TAG&, &buffer=&+buffer);String ss=toHexString1(buffer);//调用byte--&string方法//data=inputstream.read();if(is1.available()==0){Log.e(&TAG&, &break&);}}
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}};k是inputStream.read()执行的次数,num是读取进来的字节数量大概就这样执行12次左右(k++到12,11次num显示为1024,最后一次只有30~50),logcat 输出“break”,也就是说此时is1.available()==0可是我的蓝牙数据是一直不停得发送啊,求问各位大大为什么呢?还有就是,有一次我在if(is1.available()==0)之前写了data=is1.read();就出现了k++不停,而每一次num都是有几十,这时候数据还可靠吗?为什么会出现这种情况呢?
AutoTDS-V1型全自动热解吸仪是一款20位常温二次全自动热解吸仪,气路采....
楼主好三楼傻逼,玩的就是心跳。拒绝反弹。
求大大们解答一下
这个点有大神吗。。
安吧无大神全是水笔,去java吧
登录百度帐号推荐应用标签:至少1个,最多5个
什么是 RingBuffer
环形缓冲区:
维基百科的解释是:它是一种用于表示一个固定尺寸、头尾相连的缓冲区的数据结构,适合缓存数据流。
底层数据结构非常简单,一个固定长度的数组加一个写指针和一个读指针。
只要像这张图一样,把这个数组辦弯,它就成了一个 RingBuffer。
那它到底有什么精妙的地方呢?
我最近做的项目正好要用到类似的设计思路,所以翻出了以前在点评写的
系统,看了看以前自己写的代码。顺便写个文章总结一下。
Puma 是什么,为什么要用 RingBuffer
是一个 MySQL 数据库 Binlog 订阅消费系统。类似于阿里的 。
Puma 会伪装成一个 MySQL Slave,然后消费 Binlog 数据,并缓存在本地。当有客户端连接上来的时候,就会从本地读取数据给客户端消费。
如果用很一般的设计,一个单独的线程会从 MySQL 消费数据,存到本地文件中。然后每个客户端的连接都会有一个线程,从本地文件中读取数据。
没错,第一版就是这么简单粗暴,当然,它是有效的。
利用缓存优化性能
不用做性能测试就知道,系统压力大了以后这里必定会是一个瓶颈。读写数据会有一点延时,如果多个客户端同时读取同一份数据又会造成很多的浪费。然后数据的编码解码还会有不少损耗。
所以这里当然要加一个缓存了。
一个线程写数据到磁盘,然后它可以同时把数据传递给各个客户端。
听起来像发布者订阅者模式?也有点像生产者消费者模式?
但是,它并不仅仅是发布者订阅者模式,因为这里的“发布者”和“订阅者”是完全异步的,而且每个“订阅者”的消费速度是不一样的。
它也不仅仅是生产者消费者模式,因为这里的“消费者”是同时消费所有数据,而不是把数据分发给各个“消费者”。
它们的消费速度不一样,还会出现缓存内的数据过新,“消费者”不得不去磁盘读取。
感觉这里的需求是两种设计模式的结合。
不仅如此,这是一套高并发的系统,怎么保证性能,怎么保证数据一致性?
所以,这个缓存看上去简单,其实它不简单。
常规解决思路
目标明确后,看看 Java 的并发集合中有什么能满足需求的吧。
第一个想到的就是BlockingQueue,为每一个客户端创建一个BlockingQueue,BlockingQueue内部是通过加锁来实现的,虽然锁冲突不会很多,但高并发的情况下,最好还是能做到无锁。
当时正好看到了一系列介绍 Disruptor 的文章:
所以就在想能不能把 RingBuffer 用来解决我们的问题呢?
对 RingBuffer 进行改进
看完了 RingBuffer 的基本原理后,就要开始用它来适应我们的系统了。这里遇到了几个问题:
如何支持多个消费者
如何判断当前有无新数据,如何判断当前数据是否已经被新数据覆盖
如何保证数据一致性
第一个问题
这个问题简单,原始的 RingBuffer 只有一个写指针和一个读指针。
要支持多个消费者的话,只要为每个消费者创建一个读指针即可。
第二个问题
RingBuffer 的一个精髓就是,写指针和读指针的大小是会超过数组长度的,写入和读取数据的时候,是采用writeIndex % CACHED_SIZE这样的形式来读取的。
为什么要这么做?这就是为了解决判断有无新数据和数据是否已被覆盖的问题。
假设我内部2个指针,分别叫nextWriteIndex,nextReadIndex。
那么判断有无新数据的逻辑就是if (nextReadIndex &= nextWriteIndex),返回true的话就是没有新数据了。
而判断数据是否被覆盖的逻辑就是if (nextReadIndex & nextWriteIndex - CACHED_SIZE),返回true的话就是数据已经被覆盖了。
拿实际数据举个例子:
一个长度为10的 RingBuffer,内部是一个长度为10的数组。
此时nextWriteIndex=12,意味着它下一次写入的数据会在 12%10=2 上。此时,可读的有效范围是 2~11,对应的数组内的索引就是 2, 3, 4, 5, 6, 7, 8, 9, 0, 1。
所以,当nextReadIndex=12 的时候,会读到最老的数据2,这是老数据,不是新数据,此时表示没有行数据了。
当nextReadIndex=1 的时候,是新数据,而不是想要的老数据,老数据已经被覆盖掉了,此时它没办法从缓存里读数据了。
第三个问题
最棘手的第三个问题来了,这个系统是要支持高并发的,如果是同步的操作,上面的代码没有任何问题。或者说,如果是同步的代码,干嘛还要用 RingBuffer 呢?
上面写入和读取,都有两步操作,更改数据和更改索引,按照逻辑上来讲,它们应该是强一致性的。只能加锁了?如果要加锁,为何不直接用BlockingQueue?
所以,是否可以通过什么方法,高并发和最终一致性呢?
直接贴代码吧,根据代码一步步分析:
public class CachedDataStorage {
private static final int CACHED_SIZE = 5000;
private final ChangedEventWithSequence[] data = new ChangedEventWithSequence[CACHED_SIZE];
private volatile long nextWriteIndex = 0;
public void append(Object dataValue) {
data[(int) (nextWriteIndex % CACHED_SIZE)] = dataV
nextWriteIndex++;
public Reader createReader() {
return new Reader();
public class Reader {
private Reader() {
private volatile long nextReadIndex = 0;
public Object next() throws IOException {
if (nextReadIndex &= nextWriteIndex) {
if (nextReadIndex &= nextWriteIndex - CACHED_SIZE) {
throw new IOException("data outdated");
Object dataValue = data[(int) (nextReadIndex % CACHED_SIZE)];
if (nextReadIndex &= nextWriteIndex - CACHED_SIZE) {
throw new IOException("data outdated");
nextReadIndex++;
return dataV
我们来一步步分析,先看内部的Reader和createReader()方法,每来一个客户端就会创建一个Reader,每个Reader会维护一个nextReadIndex。
然后看append()方法,可以说没有任何逻辑,直接写入数据,修改索引就结束了。但是,别小看了这两个步骤的操作顺序。
好了,到了最复杂的next()方法了,这里可就大有讲究了。
一进来立刻执行if (nextReadIndex &= nextWriteIndex),用来判断当前是否还有更新的数据。
因为写入的时候是先写数据再改索引,所以可能会出现明明有数据,但是这里认为没数据的情况。
但是并没有关系,我们更关注最终一致性,因为我们要的是确保这里一定不会读错数据,而不一定要确保这里有新数据就要立刻处理。就算这一轮没读到,下一轮也一定会读取到了。
下一步是这一行if (nextReadIndex &= nextWriteIndex - CACHED_SIZE),判断想要读取的数据有没有被新数据覆盖。等一下,这里为什么和上面介绍的不一样?
上面写的是&,而这里却是&=。上面提到,同步操作的情况下,用&是没有问题的,但是这里的异步的。
写入数据的时候,可能会出现数据已被覆盖,而索引未被更新的问题,所以这样子判断可以保证不会读错数据。
既然上下边界都检查过了,那么就读取数据吧!就当这里准备读数据的时候,写数据的线程竟然又写入了好多数据,导致读出来的数据已经被覆盖了!
所以,一定要在读完数据后,再次检查数据是否被覆盖。
最终,整个过程实现了无锁,高并发和最终一致性。
在 Puma 系统中,启用缓存和关闭缓存,一写五读的情况下,性能整整提高了一倍。测试还是在我 SSD 上进行的,如果是传统硬盘,提升会更明显。
利用 RingBuffer 实现后的优点
代码实现完,就可以和BlockingQueue对比一下了。
首先,RingBuffer 完全是无锁的,没有任何锁冲突。而利用BlockingQueue的话它内部会加锁,虽然锁冲突不会很多,但是没锁肯定比有锁好。而且,当 Writer 往多个BlockingQueue中顺序写入数据的时候,会有相互影响。而利用 RingBuffer 实现的话,无论有多少 Reader,都不会影响写入性能。
然后是内存上的优势,每次多一个 Reader 仅仅是多一个对象,Reader对象内部也只有一个变量,占用内存非常非常小。而使用BlockingQueue的话,需要创建的就不仅仅是一个对象了,会有一系列的东西。
目前看来,该实现能满足我们的需求且无明显缺点,而且已经在系统中平稳运行了将近一年了。在我离职之前的最后一个项目,就包括为点评订单系统接入 Puma,订单系统在活动期间的写入 QPS 非常高。但对 Puma 来说也是毫无压力的,最终的瓶颈都不是在 Puma 上,而是在目标数据库的写入性能上。
高并发系统的设计思路
首先,这部分的代码可以在这里找到:
完整的代码还包含了老数据被覆盖无数据可读时的数据源切换逻辑,还有当无消费者时关闭 RingBuffer 的逻辑。上面的代码已经被简化了很多,想看完整代码的话可以在上面的链接中看到。
以后有空还会再介绍更多 Puma 中遇到的问题和解决的思路。
然后谈谈高并发系统的设计。
Java 并发编程的第一重境界是善用各种锁,尽量减少锁冲突,不能有死锁。
第二重境界就是善用 Java 的各种并发包,Java 的并发包里有的是无锁的,例如AtomicLong中用了CAS;有的是用了各种手段减少锁冲突,例如ConcurrentHashMap中就用了锁分段技术。整体效率都非常高,能熟练应用后也能写出很高效的程序。
再下一个境界就非常搞脑子了,往往是放弃了强一致性,而去追求最终一致性。其中会用到AtomicLong等无锁,或锁分段技术,并且常常会把它们结合起来用。就像上面那部分代码,看似简单,但实际上却要把各种边界条件思考地很全面,因为是最终一致性,所以中间的状态非常多。
1 收藏&&|&&2
你可能感兴趣的文章
99 收藏,4.1k
2 收藏,375
2 收藏,223
本作品采用 署名-非商业性使用-禁止演绎 4.0 国际许可协议 进行许可
分享到微博?
你好!看起来你挺喜欢这个内容,但是你还没有注册帐号。 当你创建了帐号,我们能准确地追踪你关注的问题,在有新答案或内容的时候收到网页和邮件通知。还能直接向作者咨询更多细节。如果上面的内容有帮助,记得点赞 (????)? 表示感谢。
明天提醒我
我要该,理由是:您的访问出错了(404错误)
很抱歉,您要访问的页面不存在。
1、请检查您输入的地址是否正确。
进行查找。
3、感谢您使用本站,1秒后自动跳转SSIS技巧--优化数据流缓存
来源:博客园
我们经常遇到一种情况,在SSMS中运行很慢的一个查询,当把查询转化成从源到目的数据库的SSIS数据流以后,需要花费几倍的时间!源和数据源都没有任何软硬件瓶颈,并且没有大量的格式转换。之前看了很多关于这种情况的优化方案,例如扩大缓存大小等。虽然也能快一点,但是仍然远远比直接在SSMS中查询的速度满的多。究竟是什么原因导致的呢?
解决
首先这个数据流性能是有很多因素决定的,例如源数据的速度、目标库的写入速度、数据转换和路径数量的使用等等。但是,如果只是一个很简单的数据流,那么提高缓存的容量即可改善性能。例如,如果缓存设的更大,那么数据流一次转换更多的数据行,所以性能可以提升。当然很多其他情况就不是这么容易优化了。并且缓存过大时一旦源读取填充缓存时间过长导致了目标库闲置一直处于等待状态直到缓存完成。在这个技巧中,将会介绍如何解决这种问题。
 
测试场景
首先创建一个百万数据的源表。表结构是一个典型的name-value 键值对表,便于阐述我们的问题。其中value 列设为5000char。如下:

IF EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'[dbo].[NameValuePairs]') AND [type] IN (N'U'))
 DROP TABLE [dbo].[NameValuePairs];
GO

IF NOT EXISTS (SELECT 1 FROM sys.objects WHERE object_id = OBJECT_ID(N'[dbo].[NameValuePairs]') AND [type] IN (N'U'))
BEGIN
 CREATE TABLE [dbo].[NameValuePairs]
([ID] [int] IDENTITY(1,1) NOT NULL
,[Type] [varchar](100) NOT NULL
,[Value] [varchar](5000) NULL
PRIMARY KEY CLUSTERED ([ID] ASC));
END
GO


  
使用AdventureWorksDW2012 样板数据,你可以搜索下载。表中有各种用户信息:names, gender, addresses, birth dates, email addresses 和phone numbers。如下:
 

INSERT INTO [dbo].[NameValuePairs]([Type],[Value])
SELECT
 [Type] = 'Customer Name'
 ,[Value] = [FirstName] + ' ' + [LastName]
FROM [AdventureWorksDW2012].[dbo].[DimCustomer]

UNION ALL

SELECT
 [Type] = 'BirthDate'
 ,[Value] = CONVERT(CHAR(8),[BirthDate],112)
FROM [AdventureWorksDW2012].[dbo].[DimCustomer]

UNION ALL

SELECT
 [Type] = 'Gender'
 ,[Value] = [Gender]
FROM [AdventureWorksDW2012].[dbo].[DimCustomer]

UNION ALL

SELECT
 [Type] = 'Email Address'
 ,[Value] = [EmailAddress]
FROM [AdventureWorksDW2012].[dbo].[DimCustomer]

UNION ALL

SELECT
 [Type] = 'Address'
 ,[Value] = [AddressLine1]
FROM [AdventureWorksDW2012].[dbo].[DimCustomer]

UNION ALL

SELECT
 [Type] = 'Phone Number'
 ,[Value] = [Phone]
FROM [AdventureWorksDW2012].[dbo].[DimCustomer];
GO 500


  
 
当然也可以自己写一个循环脚本插入数据。DimCustomer 维度表中有18000行数据,通过不同的结果集能返回110,000行数据 。注意这个语句INSERT …SELECT … ,最后有个GO,这不是官方的,但是也是可以用的,后面紧跟的数字表示批处理执行的次数。本例中就是500次。意味着5,500,000行数据被插入,大概有2.3gb。

比如我们可查询邮箱地址:

SELECT [Customer Email] = [Value]
FROM [dbo].[NameValuePairs]
WHERE [Type] = 'Email Address';


  
查询会返回9,242,000 行数据用33秒左右。这个是我们包的最快运行的时间理论上。那么包能不能运行的更快呢?SSIS中将邮件地址转换成邮箱维度表,该列在新表中只有50个字符的宽度,但是在源表中的该列却是5000个字符。但是我们知道在本例中这个邮箱地址不会超过50个字符。

CREATE TABLE dbo.DimEmail
 ([SK_Email] INT IDENTITY(1,1) NOT NULL
 ,[Email Address] VARCHAR(50) NOT NULL
 ,[InsertDate] DATE NOT NULL);


  
SSIS包
生成包是相对简单的,整个控制流由4分任务组成:
第一个任务是记录包开始的日志。
第二个任务是清空目标表。
第三个任务是数据流任务,下面详细介绍。
最后日志记录任务结束。

数据流本身也是很简单:使用前面提到查询读取数据源,然后将加入了审核列和目标表的派生列将结果集写入邮箱维度表。

目标数据库展示了一个截断警告,因为我们试图将超过目标表字段长度的数据插入进来。
初始性能
为了限制外部影响,目标数据库的日志和数据文件足够大,不会影响整个事务。在开发环境下,整个包运行了大约40秒。这是要比直接查询慢的!写入操作是可以被优化的。下面看一下如何优化行数据的插入…
优化数据流
之前提到的最佳实践之一就是扩大缓冲区,具体操作就是修改数据流属性里面的DefaultBufferMaxRows(默认缓存最大行数) 和DefaultBufferSize(默认缓存大小)。SSIS引擎就是使用这个属性来估计在管道中传送数据的缓存大小。更大的缓存意味着更多行可以被同时处理。
当设定最大值行数为30000并且默认缓存为20M的时候,执行包花费了30秒,这也仅仅比之前源查询快了一点。所以还应该有空间去优化。
在源组件端,估计行的大小是取决于查询返回所有列中的最大列。这也是性能问题的所在:我们建立的键值对表,最大列我5000字符,SSIS引擎将会认为这个列一定包含5000个字符,及时实际上小于50个字符。5000个非Unicode字符等于5000个字节或者5kb。默认的缓存大小事10MB,因此意味着一次仅仅能存储2000行数据,15分之一。这也意味着我们我们并没有最优化的使用缓存。
那么我们只需要调整源数据查询映射的实际数据长度,就能够实现潜在性能的提升。如下:

SELECT [Customer Email] = CONVERT(VARCHAR(50),[Value])
FROM [dbo].[NameValuePairs]
WHERE [Type] = 'Email Address';


  
既然我们已经知道该列最大的是50个字符,改成这样以后一次性能多放入一百倍的数据。当包运行时数据流执行仅仅用了12秒!

我们可以看一下三次不同的包的执行比较(默认配置--扩大缓存--扩大缓存并减小列宽),分别在SSIS catalog 中运行20次在,曲线图如下:

不用多说大家都知道这三种性能如何了。

总结
本篇只是针对数据流进行了优化,并不涉及SQL本身的优化,这里偏重BI一点。通过关注返回源数据的列宽,极大的提高了性能,除此之更小的列性能更好。一次性缓存的行也越多。通过扩大缓存也进一步能提升性能
免责声明:本站部分内容、图片、文字、视频等来自于互联网,仅供大家学习与交流。相关内容如涉嫌侵犯您的知识产权或其他合法权益,请向本站发送有效通知,我们会及时处理。反馈邮箱&&&&。
学生服务号
在线咨询,奖学金返现,名师点评,等你来互动

我要回帖

更多关于 为什么用stringbuffer 的文章

 

随机推荐