createdirectstream junior和createstream的区别

php中stream(流)的用法
字体:[ ] 类型:转载 时间:
Stream是PHP开发里最容易被忽视的函数系列(SPL系列,Stream系列,pack函数,封装协议)之一,但其是个很有用也很重要的函数。Stream可以翻译为“流”,下面是使用方法
在Java里,流是一个很重要的概念。
流(stream)的概念源于UNIX中管道(pipe)的概念。在UNIX中,管道是一条不间断的字节流,用来实现程序或进程间的通信,或读写外围设备、外部文件等。根据流的方向又可以分为输入流和输出流,同时可以在其外围再套上其它流,比如缓冲流,这样就可以得到更多流处理方法。
PHP里的流和Java里的流实际上是同一个概念,只是简单了一点。由于PHP主要用于Web开发,所以“流”这块的概念被提到的较少。如果有Java基础,对于PHP里的流就更容易理解了。其实PHP里的许多高级特性,比如SPL,异常,过滤器等都参考了Java的实现,在理念和原理上同出一辙。
比如下面是一段PHP SPL标准库的用法(遍历目录,查找固定条件的文件):
代码如下:class RecursiveFileFilterIterator extends FilterIterator{&&& // 满足条件的扩展名&&& protected $ext = array('jpg','gif');
&&& /**&&&& * 提供 $path 并生成对应的目录迭代器&&&& */&&& public function __construct($path)&&& {&&&&&&& parent::__construct(new RecursiveIteratorIterator(new RecursiveDirectoryIterator($path)));&&& }
&&& /**&&&& * 检查文件扩展名是否满足条件&&&& */&&& public function accept()&&& {&&&&&&& $item = $this-&getInnerIterator();&&&&&&& if ($item-&isFile() && in_array(pathinfo($item-&getFilename(), PATHINFO_EXTENSION), $this-&ext))&&&&&&& {&&&&&&&&&&& return TRUE;&&&&&&& }&&& }}
// 实例化foreach (new RecursiveFileFilterIterator('D:/history') as $item){&&& echo $item . PHP_EOL;}
Java里也有和其同出一辙的代码:
代码如下:public class DirectoryContents{&&& public static void main(String[] args) throws IOException&&& {&&&&&&& File f = new File("."); // current directory
&&&&&&& FilenameFilter textFilter = new FilenameFilter()&&&&&&& {&&&&&&&&&&& public boolean accept(File dir, String name)&&&&&&&&&&& {&&&&&&&&&&&&&&& String lowercaseName = name.toLowerCase();&&&&&&&&&&&&&&& if (lowercaseName.endsWith(".txt"))&&&&&&&&&&&&&&& {&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&& }&&&&&&&&&&&&&&& else&&&&&&&&&&&&&&& {&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&& }&&&&&&&&&&& }&&&&&&& };
&&&&&&& File[] files = f.listFiles(textFilter);
&&&&&&& for (File file : files)&&&&&&& {&&&&&&&&&&& if (file.isDirectory())&&&&&&&&&&& {&&&&&&&&&&&&&&& System.out.print("directory:");&&&&&&&&&&& }&&&&&&&&&&& else&&&&&&&&&&& {&&&&&&&&&&&&&&& System.out.print("&&&& file:");&&&&&&&&&&& }
&&&&&&&&&&& System.out.println(file.getCanonicalPath());&&&&&&& }&&& }}
举这个例子,一方面是说明PHP和Java在很多方面的概念是一样的,掌握一种语言对理解另外一门语言会有很大的帮助;另一方面,这个例子也有助于我们下面要提到的过滤器流-filter。其实也是一种设计模式的体现。
我们可以通过几个例子先来了解stream系列函数的使用。
下面是一个使用socket来抓取数据的例子:
代码如下:$post_ =array (&'author' =& 'Gonn',&'mail'=&'',&'url'=&'http://www.nowamagic.net/',&'text'=&'欢迎访问简明现代魔法');
$data=http_build_query($post_);$fp = fsockopen("nowamagic.net", 80, $errno, $errstr, 5);
$out="POST http://nowamagic.net/news/1/comment HTTP/1.1\r\n";$out.="Host: typecho.org\r\n";$out.="User-Agent: Mozilla/5.0 (W U; Windows NT 6.1; zh-CN; rv:1.9.2.13) Gecko/ Firefox/3.6.13"."\r\n";$out.="Content-type: application/x-www-form-urlencoded\r\n";$out.="PHPSESSID=082b0cc33cc7e6df1feb0\r\n";$out.="Content-Length: " . strlen($data) . "\r\n";$out.="Connection: close\r\n\r\n";$out.=$data."\r\n\r\n";
fwrite($fp, $out);while (!feof($fp)){&&& echo fgets($fp, 1280);}
fclose($fp);
我们也可以用stream_socket 实现,这很简单,只需要打开socket的代码换成下面的即可:
代码如下:$fp = stream_socket_client("tcp://nowamagic.net:80", $errno, $errstr, 3);
再来看一个stream的例子:
file_get_contents函数一般常用来读取文件内容,但这个函数也可以用来抓取远程url,起到和curl类似的作用。
代码如下:$opts = array (&'http'=&array(&&& 'method' =& 'POST',&&& 'header'=& "Content-type: application/x-www-form-urlencoded\r\n" .&&&&& "Content-Length: " . strlen($data) . "\r\n",&&& 'content' =& $data));
$context = stream_context_create($opts);file_get_contents('http://www.jb51.net/news', false, $context);
注意第三个参数,$context,即HTTP流上下文,可以理解为套在file_get_contents函数上的一根管道。同理,我们还可以创建FTP流,socket流,并把其套在对应的函数在。
更多关于 stream_context_create,可以参考:PHP函数补完:stream_context_create()模拟POST/GET。
上面提到的两个stream系列的函数都是类似包装器的流,作用在某种协议的输入输出流上。这样的使用方式和概念,其实和Java中的流并没有大的区别,比如Java中经常有这样的写法:
代码如下:new DataOutputStream(new BufferedOutputStream(new FileOutputStream(new File(fileName))));
一层流嵌套着另外一层流,和PHP里有异曲同工之妙。
我们再来看个过滤器流的作用:
代码如下:$fp = fopen('c:/test.txt', 'w+');
/* 把rot13过滤器作用在写入流上 */stream_filter_append($fp, "string.rot13", STREAM_FILTER_WRITE);
/* 写入的数据经过rot13过滤器的处理*/fwrite($fp, "This is a test\n");rewind($fp);
/* 读取写入的数据,独到的自然是被处理过的字符了 */fpassthru($fp);fclose($fp);
// output:Guvf vf n grfg
在上面的例子中,如果我们把过滤器的类型设置为STREAM_FILTER_ALL,即同时作用在读写流上,那么读写的数据都将被rot13过滤器处理,我们读出的数据就和写入的原始数据是一致的。
你可能会奇怪stream_filter_append中的 "string.rot13"这个变量来的莫名其妙,这实际上是PHP内置的一个过滤器。
使用下面的方法即可打印出PHP内置的流:
代码如下:streamlist = stream_get_filters();print_r($streamlist);
代码如下:Array(&&& [0] =& convert.iconv.*&&& [1] =& mcrypt.*&&& [2] =& mdecrypt.*&&& [3] =& string.rot13&&& [4] =& string.toupper&&& [5] =& string.tolower&&& [6] =& string.strip_tags&&& [7] =& convert.*&&& [8] =& consumed&&& [9] =& dechunk&&& [10] =& zlib.*&&& [11] =& bzip2.*)
自然而然,我们会想到定义自己的过滤器,这个也不难:
代码如下:class md5_filter extends php_user_filter{&&& function filter($in, $out, &$consumed, $closing)&&& {&&&&&&& while ($bucket = stream_bucket_make_writeable($in))&&&&&&& {&&&&&&&&&&& $bucket-&data = md5($bucket-&data);&&&&&&&&&&& $consumed += $bucket-&&&&&&&&&&&& stream_bucket_append($out, $bucket);&&&&&&& }
&&&&&&& //数据处理成功,可供其它管道读取&&&&&&& return PSFS_PASS_ON;&&& }}stream_filter_register("string.md5", "md5_filter");
注意:过滤器名可以随意取。
之后就可以使用"string.md5"这个我们自定义的过滤器了。
这个过滤器的写法看起来很是有点摸不着头脑,事实上我们只需要看一下php_user_filter这个类的结构和内置方法即了解了。
过滤器流最适合做的就是文件格式转换了,包括压缩,编解码等,除了这些“偏门”的用法外,filter流更有用的一个地方在于调试和日志功能,比如说在socket开发中,注册一个过滤器流进行log记录。比如下面的例子:
代码如下:class md5_filter extends php_user_filter{&&& public function filter($in, $out, &$consumed, $closing)&&& {&&&&&&& $data="";&&&&&&& while ($bucket = stream_bucket_make_writeable($in))&&&&&&& {&&&&&&&&&&& $bucket-&data = md5($bucket-&data);&&&&&&&&&&& $consumed += $bucket-&&&&&&&&&&&& stream_bucket_append($out, $bucket);&&&&&&& }
&&&&&&& call_user_func($this-&params, $data);&&&&&&& return PSFS_PASS_ON;&&& }}
$callback = function($data){&&& file_put_contents("c:\log.txt",date("Y-m-d H:i")."\r\n");};
这个过滤器不仅可以对输入流进行处理,还能回调一个函数来进行日志记录。
可以这么使用:
代码如下:stream_filter_prepend($fp, "string.md5", STREAM_FILTER_WRITE,$callback);
PHP中的stream流系列函数中还有一个很重要的流,就是包装类流 streamWrapper。使用包装流可以使得不同类型的协议使用相同的接口操纵数据。
您可能感兴趣的文章:
大家感兴趣的内容
12345678910
最近更新的内容
常用在线小工具Spark-Streaming Windows开发环境案例搭建运行(图文) - 简书
<div class="fixed-btn note-fixed-download" data-toggle="popover" data-placement="left" data-html="true" data-trigger="hover" data-content=''>
写了8054字,被24人关注,获得了18个喜欢
Spark-Streaming Windows开发环境案例搭建运行(图文)
最近在专注Spark开发,记录下自己的工作和学习路程,希望能跟大家互相交流成长具体代码可参考本人GitHub地址:本文章对应代码地址:具体代码实现以及思路请参考笔者之前发布的文章:鉴于文章篇幅有限,关于Maven/InteliiJ IDEA/Scala等知识请自行补充本文章发布后会及时更新文章中出现的错误及增加内容,欢迎大家订阅QQ: 微信:guofei1990123
Spark运行模式有 local/standalone等等,为了方便开发测试开发过程中使用Local模式运行
本地开发环境介绍
开发工具:IntelliJ IDEA 打包工具 : apache-maven-3.3.9Spark版本:1.3.0JDK版本:jdk1.8.0_66Scala SDK版本:2.10.4Kafka版本:kafka_2.10系统版本:Windows 10旗舰版本机IP:192.168.61.1
实现思路及部分代码
模拟一个Kafka消息生产者往对应 Kafka Topic写数据,核心逻辑如下:
val topic = "user_events"
val brokers = "hc4:9092"
val props = new Properties()
props.put("metadata.broker.list", brokers)
props.put("serializer.class", "kafka.serializer.StringEncoder")
val kafkaConfig = new ProducerConfig(props)
val producer = new Producer[String, String](kafkaConfig)
while(true) {
// prepare event data
val event = new JSONObject()
.put("uid", UUID.randomUUID())//随机生成用户id
.put("event_time", System.currentTimeMillis.toString) //记录时间发生时间
.put("os_type", getOsType) //设备类型
.put("click_count", click) //点击次数
// produce event message
producer.send(new KeyedMessage[String, String](topic, event.toString))
println("Message sent: " + event)
Thread.sleep(200)
Spark-Streaming程序消费对应 Kafka Topic中数据并做相关业务逻辑操作Streaming程序消费Kafka数据核心逻辑如下:
// Kafka Topic
val topics = Set("user_events")
// Kafka brokers
val brokers = "hc4:9092"
val kafkaParams = Map[String, String](
"metadata.broker.list" -& brokers,
"serializer.class" -& "kafka.serializer.StringEncoder")
// Create a direct stream
val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
统计结果存储结果数据保存HBase核心逻辑代码:
userClicks.foreachRDD拿到的是微批处理一个批次数据
rdd.foreachPartition拿到的是一个批次在Spark各节点对应的分区数据
partitionOfRecords.foreach拿到对应分区的每条数据
userClicks.foreachRDD(rdd =& {
rdd.foreachPartition(partitionOfRecords =& {
//Hbase配置
val tableName = "PageViewStream"
val hbaseConf = HBaseConfiguration.create()
hbaseConf.set("hbase.zookeeper.quorum", "hc4:9092")
hbaseConf.set("hbase.zookeeper.property.clientPort", "2181")
hbaseConf.set("hbase.defaults.for.version.skip", "true")
partitionOfRecords.foreach(pair =& {
val uid = pair._1
//点击次数
val click = pair._2
System.out.println("uid: "+uid+" click: "+click)
//组装数据
create 'PageViewStream','Stat'
val put = new Put(Bytes.toBytes(uid))
put.add("Stat".getBytes, "ClickStat".getBytes, Bytes.toBytes(click))
val StatTable = new HTable(hbaseConf, TableName.valueOf(tableName))
StatTable.setAutoFlush(false, false)
//写入数据缓存
StatTable.setWriteBufferSize(3*)
StatTable.put(put)
StatTable.flushCommits()
运行Kafka生产者模拟器(KafkaMessageGenerator)附加:程序打包到Linux环境执行使用//java -classpath ./spark-streaming-1.0-SNAPSHOT-shaded.jar guofei.KafkaEventProducer
java -classpath Jar包路径
KafkaMessageGenerator类全路径
IntelliJ IDEA.jpg
运行Spark-Streaming主程序(PageViewStream),浏览器打开Spark UI界面,下图为Job运行情况,URL地址:
SparkUI.jpg
通过hbase客户端(hbase shell)查看对应表统计的数据hbase shell
scan 'PageViewStream'
HBase Cli.jpg
运行Streaming主程序报找不到 hadoop二进制文件
Failed to locate the winutils binary in the hadoop binary path
Streaming本地运行模式需要本地装有配置好 HADOOP_HOME的hadoop环境解决:解压window平台下编译的hadoop组建,配置环境变量HADOOP_HOME并重启IDEA
权限验证失败
SecurityManager: aut users with view permissions: Set(hc-3450); users with modify permissions: Set(hc-3450)
Exception in thread "main" java.lang.NoSuchMethodError: scala.collection.immutable.HashSet$.empty()Lscala/collection/immutable/HashS
原因:Scala SDK版本与Spark和Kafka内置Scala版本不一致解决:Scala SDK换成Spark和Kafka对应的Scala版本
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
打开微信“扫一扫”,打开网页后点击屏幕右上角分享按钮
被以下专题收入,发现更多相似内容:
大数据,收录大数据相关技术的文章。
· 3239人关注
专注于spark相关内容分享,github相关资料整理:/jacksu/utils4s
· 291人关注
如何高效利用spark解决问题的文章
· 192人关注
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
选择支付方式:

我要回帖

更多关于 direct3dcreate9 的文章

 

随机推荐