dstream收到listjs怎么处理list

使用ARM DS-5与Dstream StreamLine进行Android底层性能分析的一个实例
一个类似于Android的OS,只使用了BT机能的状态下,CPU的占有率超过20%,于是我们想看看是什么原因。本篇文章注意介绍了使用Dstream StreamLine来进行性能分析的过程和实例以及可能需要注意的地方。
StreamLine准备
使用StreamLine来分析性能主要包含以下几个过程
配置内核使得内核可以产生一些性能相关的数据,以及一些设施用以支持gator,例如:高精度的timer(hr_timer)安装gator模块,用于系统性能数据(scheduler,event,Process)的采集以及对这些数据进行注释(annotate),例如对CP15中的PMNC(Performance Monitor Control Register)寄存器的读取与配置,对调度器数据的采集gatord从gator内核模块中获取性能数据,并通过网络(不一定是网线)传递给Host PC中的DS-5DS-5中的streamline对应软件模块对性能数据进行分析,以及与模块代码做出对应
对于前面的第一、第二步骤可以参考ARM官方的说明文档:
编译与环境准备时候的注意点
如果使用的是adb 来让gatord传输数据到Host PC,那么需要将其中gator kernel module中的宏去掉(注释掉):
If you are building for an Android target, you must remove the comment hashtag from the following line in the makefile of the gator module to enable kernel stack unwinding
# EXTRA_CFLAGS += -DGATOR_KERNEL_STACK_UNWINDING
StreamLine数据采集与配置
gatord 端口号被占用问题
默认情况下,gatord使用的端口号为8080,但是在运行了许多应用程序的OS中,有可能这个端口号已经被占用了,如果被占用了,那么gatord在运行的时候,会出现如下的log提示,询问我们是否已经运行了一个gatord实例了:
Is an instance already running?
此时,我们可以使用ps 来查看一下是否已经存在运行的gatord,如果没有,那么可以看看这个端口号被谁使用了:
lsof &| grep 8080
如果没有lsof(android),那么可以使用netstat来查看:
&netstat -ltpn | grep 8080
另外也可以修改代码,打印出提示log对应的errno,在gator-daemon中的main.cpp中,添加errno的头文件,在bind IP地址失败后打印出errno来。
如果确实被占用了,那么我们需要更改这个端口号,可以直接对main.cpp中的端口号8080进行更改,也可以在运行gatord的时候,使用-p选项进行指定,例如将端口号指定为8888:
gatord -p 8888&
使用USB-ADB来连接Target与Host
如果没有网线,但是有adb(在android手机中,几乎都有),那么就可以使用adb
在PC中将远程gatord的端口号转发到本地的某个端口号,例如也是8888:
adb forward tcp:8888 tcp:8888
性能数据的采集
从DS-5 Eclipse中选择Window & Show View & Other & DS-5 & ARM Streamline Data
然后点击齿轮配置Capture。
因为已经使用adb将远程的gatord输出重定向了本地PC上的8888端口,所以在Address中填入localhost:8888。最后面的Program Images可以添加elf文件,从而可以加载symbols。添加elf image文件,也可以在采集完成后再添加。这里以采集完成后添加为例。
添加少量的elf文件用于分析
在完成了Capture以后,可以添加elf文件,从而使各个进程都有symbols,点击下图中的齿轮,进入配置页面。
Figure: Setting
在弹出的设置对话框中,在Program Images中可以添加elf文件,但是这里只能一个个的添加,无法大量按照特殊要求添加(例如只添加libQt***):
Figure: Add_ELF_Files
添加大量的elf文件用于分析
如果要添加大量的elf文件,或者要按照我们特别的需求来添加大量的elf文件,那么就需要其他方法。
从前面的Figure: Setting图中,方框标明了这个Capture文件的位置:
/home/hexiongjun/Documents/Streamline/Test.apc/ , 其中添加了的elf文件记录在session.xml文件中。
打开这个Capture session的session.xml,添加所有的so和vmlinux文件路径,例如添加了几个的样子:
&?xml version=&1.0& encoding=&UTF-8&?&
&session version=&1& output_path=&x& call_stack_unwinding=&yes& parse_debug_info=&yes& high_resolution=&no& buffer_mode=&streaming& sample_rate=&normal& duration=&0& target_type=&ARM - Streamline Agent& target_address=&localhost:8888& live_rate=&100&&
&image path=&${streamline_results}/Test.apc/vmlinux&/&
&image path=&${streamline_results}/Test.apc/app.so&/&
&image path=&${streamline_results}/Test.apc/ld-linux.so.3&/&
&image path=&${streamline_results}/Test.apc/libc.so.6&/&
&image path=&${streamline_results}/Test.apc/libQtGui.so&/&
&image path=&${streamline_results}/Test.apc/libQtCore.so&/&
&image path=&/home/hexiongjun/work/obj/SHARED_LIBRARIES/libpthread.so.0_intermediates/LINKED/libpthread.so.0&/&
&energy_capture version=&1& type=&none&&
&channel id=&0& resistance=&20& power=&yes&/&
&/energy_capture&
&/session&
session.xml (END)
注意红色字体的部分,就是添加了的几个文件。同时注意这些elf文件的路径位置,它们被拷贝到了/home/hexiongjun/Documents/Streamline/Test.apc/中。
因此,现在问题就转换成了在session.xml文件中按照格式添加文件列表了。对于需要添加的的文件我们假设有两种:
添加所有elf的文件:内核+OS+App只添加符号某一类条件的文件:例如libQt***
添加所有elf的文件
为了表示最极端的情况,这里假设需要将所有的OS相关elf文件、以及APP elf文件都添加进来。那么其实就是:
kernel相关:vmlinux + kernel modules(ko)Application相关:可执行的应用程序,应用程序依赖的shared objects,以及so依赖的so
对于前者,直接添加即可,如果kernel module比较多,那么可以直接在kernel module_install中find一把,然后重定向到一个文件中,即可得到所有的ko文件路径列表。
对于后者,需要根据实际情况来处理,如果是Android环境下,那么所有的没有strip过的elf文件都放在:
out/target/$TARGETPRODUCT/symbols
$OUT/symbols
然后,使用find命令以及realpath命令来获取这些文件路径,然后拷贝到session.xml中,并使用编译器或者sed/awk,让这些文件列表符合xml语言语法接口,例如在VIM中可以使用下面命令来添加每行的结束字符串&/&:
:%s/$/\&\/\&
只添加符号某一类条件的文件
要包含某一类的lib,可以使用realpath:
realpath all_libs/libQt* && ../list.txt
realpath all_libs/libqt* && ../list.txt
StreamLine采集数据的分析
在添加好elf image文件之后,就可以双击采集好的session来进行分析了,但是如果添加的文件比较多,有可能会出现如下的提示:
java heap space
对于此问题,可以参考我在ARM社区的提问:
分析完成后,可以在Timeline标签中看到各个Process的CPU占有率。如下图中,bt线程使用了21.3%的CPU:
然后我们需要找到都是那些函数占用的CPU,切换到Call Paths标签可以看到:
出于隐私等因素,对函数和线程名称进行了模糊处理,但是CPU主要用于了串口通讯。然后根据我们的代码和对串口的使用方法知道我们没有使用DMA来传输数据,而BT本身有大量的数据产生。因此修改代码使用DMA来传输数据,然后对比CPU的占用率,就知道是否是这里的问题了。
如果文章有格式问题,请移步:
转载请注明出处。作者:&
看过本文的人也看了:
我要留言技术领域:
取消收藏确定要取消收藏吗?
删除图谱提示你保存在该图谱下的知识内容也会被删除,建议你先将内容移到其他图谱中。你确定要删除知识图谱及其内容吗?
删除节点提示无法删除该知识节点,因该节点下仍保存有相关知识内容!
删除节点提示你确定要删除该知识节点吗?Spark源码解析:DStream - 推酷
Spark源码解析:DStream
本篇是Spark源码解析的第二篇,主要通过源码分析Spark Streaming设计中最重要的一个概念——DStream。
本篇主要来分析Spark Streaming中的Dstream,重要性不必多讲,明白了Spark这个几个数据结构,容易对Spark有一个整体的把握。
和RDD那篇文章类似,虽说是分析Dstream,但是整篇文章会围绕着一个具体的例子来展开。算是对Spark Streaming源码的一个概览。
Spark Streaming的一些概念,主要和Dstream相关
Dstream的整体设计
通过一个具体例子深入讲解
什么是Spark Streaming
Scalable, high-throughput, fault-tolerant stream processing of live data streams!
一个实时系统,或者说是准实时系统。详细不再描述。
提一点就是,Streaming 的任务最后都会转化为Spark任务,由Spark引擎来执行。
It represents a continuous stream of data, either the input data stream received from source, or the processed data stream generated by transforming the input stream.
RDD 的定义是一个只读、分区的数据集(an RDD is a read-only, partitioned collection of records),而 DStream 又是 RDD 的模板,所以我们把 Dstream 也视同数据集。
我的简单理解,Dstream是在RDD上面又封了一层的数据结构。下面是官网对Dstream描述的图。
Spark Streaming和其它实时处理程序的区别
此处是来自Spark作者的论文,写的很好,我就不翻译了,摘出来我关注的点。
我们把实时处理框架分为两种:Record-at-a-time和D-Stream processing model。
Record-at-a-time:
D-Stream processing model:
两者的区别:
Record-at-a-time processing model. Each node continuously receives records, updates internal state, and sends new records. Fault tolerance is typically achieved through replication, using a synchronization protocol like Flux or DPC to ensure that replicas of each node see records in the same order (e.g., when they have multiple parent nodes).
D-Stream processing model. In each time interval, the records that arrive are stored reliably across the cluster to form an immutable, partitioned dataset. This is then processed via deterministic parallel operations to compute other distributed datasets that represent program output or state to pass to the next interval. Each series of datasets forms one D-Stream.
Record-at-a-time的问题:
In a record-at-a-time system, the major recovery challenge is rebuilding the state of a lost, or slow, node.
0x02 源码分析
A DStream internally is characterized by a few basic properties:
A list of other DStreams that the DStream depends on
A time interval at which the DStream generates an RDD
A function that is used to generate an RDD after each time interval
Dstream这个数据结构有三块比较重要。
生成RDD的时间间隔
一个生成RDD的function
这些对应到代码中的话如下,这些都会有具体的子类来实现,我们在后面的分析中就能看到。 下面先顺着例子一点点讲。
abstract class DStream[T: ClassTag] ( @transient private[streaming] var ssc: StreamingContext ) extends Serializable with Logging {
/** Time interval after which the DStream generates an RDD */
def slideDuration: Duration
/** List of parent DStreams on which this DStream depends on */
def dependencies: List[DStream[_]]
/** Method that generates an RDD for the given time */
def compute(validTime: Time): Option[RDD[T]]
// RDDs generated, marked as private[streaming] so that testsuites can access it
@transient
private[streaming] var generatedRDDs = new HashMap[Time, RDD[T]]()
// Reference to whole DStream graph
private[streaming] var graph: DStreamGraph = null
官网最基本的wordcount例子,和Spark的类似。虽简单,但是代表性很强。
val conf = new SparkConf().setMaster(&local[2]&).setAppName(&NetworkWordCount&)
val ssc = new StreamingContext(conf, Seconds(1))
val lines = ssc.socketTextStream(&localhost&, 9999)
val words = lines.flatMap(_.split(& &))
val pairs = words.map(word =& (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
ssc.start()
// Start the computation
ssc.awaitTermination()
这里涉及到了Dstream之间的转换以及RDD的生成。在这里先看一下Dstream的转换。
Dstream依赖关系
Dstream的一些依赖关系还是要先弄明白的,不然不太容易理解。Dstream依赖图很大,我们只列几个这次关注的。
这里不再详细介绍每一个组件,只放一个图,后面在看源码的时候可以回过头再看,会更清晰。
1. 源码分析: StreamingContext 类
StreamingContext的主要组成,这里我们不再展开讲StreamingContext的作用,我们先讲这个具体的例子,后面会有专门的博客来分析其中一些主要的组件,比如DstreamGraph和JobGenerator。
JobScheduler : 用于定期生成Spark Job
JobGenerator
JobExecutor
DstreamGraph:包含Dstream之间依赖关系的容器
StreamingJobProgressListener:监听Streaming Job,更新StreamingTab
StreamingTab:Streaming Job的标签页
SparkUI负责展示
class StreamingContext private[streaming] (
_sc: SparkContext,
_cp: Checkpoint,
_batchDur: Duration
) extends Logging {...}
先看第一行代码做了什么, val lines = ssc.socketTextStream(&localhost&, 9999) ,看过RDD源码的应该会记得,这一行代码就会做很多Dstream的转换,下面我们慢慢看。
socketTextStream 返回的时一个SocketInputDStream,那么SocketInputDStream是个什么东西?
def socketTextStream(
hostname: String,
port: Int,
storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2
): ReceiverInputDStream[String] = withNamedScope(&socket text stream&) {
socketStream[String](hostname, port, SocketReceiver.bytesToLines, storageLevel)
def socketStream[T: ClassTag](
hostname: String,
port: Int,
converter: (InputStream) =& Iterator[T],
storageLevel: StorageLevel
): ReceiverInputDStream[T] = {
new SocketInputDStream[T](this, hostname, port, converter, storageLevel)
2. 源码分析: SocketInputDStream 类
这里我们看到SocketInputDStream其实继承了ReceiverInputDStream,这里就出现了第一层的继承关系,可以回头看一下前面的那个图。
它里面没做太多的东西,主要自己写了一个SocketReceiver,其余的主要方法都继承自ReceiverInputDStream。
class SocketInputDStream[T: ClassTag](
_ssc: StreamingContext,
host: String,
port: Int,
bytesToObjects: InputStream =& Iterator[T],
storageLevel: StorageLevel
) extends ReceiverInputDStream[T](_ssc) {
def getReceiver(): Receiver[T] = {
new SocketReceiver(host, port, bytesToObjects, storageLevel)
3. 源码分析: ReceiverInputDStream 类
ReceiverInputDStream是一个比较重要的类,有很大一部分的Dstream都继承于它。 比如说Kafka的InputDStream。所以说这是一个比较关键的类。
Abstract class for defining any [[org.apache.spark.streaming.dstream.InputDStream]] that has to start a receiver on worker nodes to receive external data. Specific implementations of ReceiverInputDStream must define [[getReceiver]] function that gets the receiver object of type [[org.apache.spark.streaming.receiver.Receiver]] that will be sent to the workers to receive data.
注意:这里重写了一个重要的方法compute。它决定了如何生成RDD。
另外ReceiverInputDStream继承自InputDStream。
abstract class ReceiverInputDStream[T: ClassTag](_ssc: StreamingContext)
extends InputDStream[T](_ssc) {
* Generates RDDs with blocks received by the receiver of this stream. */
override def compute(validTime: Time): Option[RDD[T]] = {
val blockRDD = {
if (validTime & graph.startTime) {
// If this is called for any time before the start time of the context,
// then this returns an empty RDD. This may happen when recovering from a
// driver failure without any write ahead log to recover pre-failure data.
new BlockRDD[T](ssc.sc, Array.empty)
// Otherwise, ask the tracker for all the blocks that have been allocated to this stream
// for this batch
val receiverTracker = ssc.scheduler.receiverTracker
val blockInfos = receiverTracker.getBlocksOfBatch(validTime).getOrElse(id, Seq.empty)
// Register the input blocks information into InputInfoTracker
val inputInfo = StreamInputInfo(id, blockInfos.flatMap(_.numRecords).sum)
ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
// Create the BlockRDD
createBlockRDD(validTime, blockInfos)
Some(blockRDD)
4. 源码分析: InputDStream 类
InputDStream是一个比较重要的抽象,它是所有和Input相关Dstream的抽象类。比如FileInputDStream和我们刚才看的ReceiverInputDStream。
This is the abstract base class for all input streams. This class provides methods start() and stop() which are called by Spark Streaming system to start and stop receiving data, respectively.
Input streams that can generate RDDs from new data by running a service/thread only on the driver node (that is, without running a receiver on worker nodes), can be implemented by directly inheriting this InputDStream.
For example, FileInputDStream, a subclass of InputDStream, monitors a HDFS directory from the driver for new files and generates RDDs with the new files.
For implementing input streams that requires running a receiver on the worker nodes, use [[org.apache.spark.streaming.dstream.ReceiverInputDStream]] as the parent class.
abstract class InputDStream[T: ClassTag](_ssc: StreamingContext) extends DStream[T](_ssc) {
override def dependencies: List[DStream[_]] = List()
override def slideDuration: Duration = {
if (ssc == null) throw new Exception(&ssc is null&)
if (ssc.graph.batchDuration == null) throw new Exception(&batchDuration is null&)
ssc.graph.batchDuration
注意:到这里,才看完了第一行代码,就是那个读数据的那一行。
5. 源码分析: Dstream.flatMap 方法(以及Dstream如何生成RDD)
Dstream前面已经做过了一些介绍,不再赘述,这里开始按照例子的顺序向下讲。
看我们的第一个转换flatMap。返回了个FlatMappedDStream,并传入一个function。
def flatMap[U: ClassTag](flatMapFunc: T =& TraversableOnce[U]): DStream[U] = ssc.withScope {
new FlatMappedDStream(this, context.sparkContext.clean(flatMapFunc))
下面转到FlatMappedDStream的分析,里面会设计到如何生存RDD的操作。
class FlatMappedDStream[T: ClassTag, U: ClassTag](
parent: DStream[T],
flatMapFunc: T =& TraversableOnce[U]
) extends DStream[U](parent.ssc) {
override def dependencies: List[DStream[_]] = List(parent)
override def slideDuration: Duration = parent.slideDuration
override def compute(validTime: Time): Option[RDD[U]] = {
parent.getOrCompute(validTime).map(_.flatMap(flatMapFunc))
DStream如何生成RDD?
Get the RDD correspondi either retrieve it from cache or compute-and-cache it.
DStream 内部用一个类型是 HashMap 的变量 generatedRDD 来记录已经生成过的 RDD。
注意:compute(time)是用来生成rdd的。
private[streaming] final def getOrCompute(time: Time): Option[RDD[T]] = {
/ 从 generatedRDDs 里 来取rdd:如果有 rdd 就返回,没有 rdd 就进行 orElse 的代码
generatedRDDs.get(time).orElse {
// Compute the RDD if time is valid (e.g. correct time in a sliding window)
// of RDD generation, else generate nothing.
// 验证time是否valid
if (isTimeValid(time)) {
// 此处调用 compute(time) 方法获得 rdd 实例,并存入 rddOption 变量
val rddOption = createRDDWithLocalProperties(time, displayInnerRDDOps = false) {
// Disable checks for existing output directories in jobs launched by the streaming
// scheduler, since we may need to write output to an existing directory during checkpoint
// see SPARK-4835 for more details. We need to have this call here because
// compute() might cause Spark jobs to be launched.
// 这个函数在RDD的代码里面,看了一下不是很理解,只能通过注释知道大概意思是不检查输出目录。
PairRDDFunctions.disableOutputSpecValidation.withValue(true) {
compute(time)
rddOption.foreach { case newRDD =&
// Register the generated RDD for caching and checkpointing
if (storageLevel != StorageLevel.NONE) {
newRDD.persist(storageLevel)
logDebug(s&Persisting RDD ${newRDD.id} for time $time to $storageLevel&)
if (checkpointDuration != null && (time - zeroTime).isMultipleOf(checkpointDuration)) {
newRDD.checkpoint()
logInfo(s&Marking RDD ${newRDD.id} for time $time for checkpointing&)
// 将刚刚实例化出来的 rddOption 放入 generatedRDDs 对应的 time 位置
generatedRDDs.put(time, newRDD)
6. 源码分析: Dstream.map 方法
/** Return a new DStream by applying a function to all elements of this DStream. */
def map[U: ClassTag](mapFunc: T =& U): DStream[U] = ssc.withScope {
new MappedDStream(this, context.sparkContext.clean(mapFunc))
此处值得说明一下,看compute函数 parent.getOrCompute(validTime).map(_.map[U](mapFunc)) ,在这里同样调用了Dstream的getOrCompute函数,由于validTime已经存在,因此不重新生成RDD,而是从generatedRDDs中取出来。
然后再执行 .map(_.map[U](mapFunc)) 这部分。
class MappedDStream[T: ClassTag, U: ClassTag] (
parent: DStream[T],
mapFunc: T =& U
) extends DStream[U](parent.ssc) {
override def dependencies: List[DStream[_]] = List(parent)
override def slideDuration: Duration = parent.slideDuration
override def compute(validTime: Time): Option[RDD[U]] = {
parent.getOrCompute(validTime).map(_.map[U](mapFunc))
7. 源码分析: reduceByKey 方法
有了看RDD源码的经验,我们很容易找到reduceByKey是在PairDStreamFunctions类中的。下面看一下它的源码。
Return a new DStream by applying reduceByKey to each RDD. The values for each key are merged using the supplied reduce function. org.apache.spark.Partitioner is used to control the partitioning of each RDD.
def reduceByKey(
reduceFunc: (V, V) =& V,
partitioner: Partitioner): DStream[(K, V)] = ssc.withScope {
combineByKey((v: V) =& v, reduceFunc, reduceFunc, partitioner)
Combine elements of each key in DStream’s RDDs using custom functions. This is similar to the combineByKey for RDDs.
此处,我们仿佛看到了套路,感觉和RDD的设计何其的一致。
这里来了一个ShuffledDStream,具体的Shuffle过程可能会有一点小复杂,暂时不讲,关于shuffle的内容需要再详细地理解一下。
def combineByKey[C: ClassTag](
createCombiner: V =& C,
mergeValue: (C, V) =& C,
mergeCombiner: (C, C) =& C,
partitioner: Partitioner,
mapSideCombine: Boolean = true): DStream[(K, C)] = ssc.withScope {
val cleanedCreateCombiner = sparkContext.clean(createCombiner)
val cleanedMergeValue = sparkContext.clean(mergeValue)
val cleanedMergeCombiner = sparkContext.clean(mergeCombiner)
new ShuffledDStream[K, V, C](
cleanedCreateCombiner,
cleanedMergeValue,
cleanedMergeCombiner,
partitioner,
mapSideCombine)
8. 源码分析: DStream.print 方法
最后的打印函数也有点意思,它调用的时Dstream的print函数。
看 firstNum.take(num).foreach(println) 这一句,打印出了rdd的内容。
def print(num: Int): Unit = ssc.withScope {
def foreachFunc: (RDD[T], Time) =& Unit = {
(rdd: RDD[T], time: Time) =& {
val firstNum = rdd.take(num + 1)
// scalastyle:off println
println(&-------------------------------------------&)
println(s&Time: $time&)
println(&-------------------------------------------&)
firstNum.take(num).foreach(println)
if (firstNum.length & num) println(&...&)
// scalastyle:on println
foreachRDD(context.sparkContext.clean(foreachFunc), displayInnerRDDOps = false)
我又发现了一个新的Dstream:ForEachDStream。按照注释来讲,上面的print的操作应该生成的时一个ForEachDStream不过,没找到代码。只能暂时搁置。
An internal DStream used to represent output operations like DStream.foreachRDD.
class ForEachDStream[T: ClassTag] (
parent: DStream[T],
foreachFunc: (RDD[T], Time) =& Unit,
displayInnerRDDOps: Boolean
) extends DStream[Unit](parent.ssc) {
override def dependencies: List[DStream[_]] = List(parent)
override def slideDuration: Duration = parent.slideDuration
override def compute(validTime: Time): Option[RDD[Unit]] = None
override def generateJob(time: Time): Option[Job] = {
parent.getOrCompute(time) match {
case Some(rdd) =&
val jobFunc = () =& createRDDWithLocalProperties(time, displayInnerRDDOps) {
foreachFunc(rdd, time)
Some(new Job(time, jobFunc))
case None =& None
至此,分析完了Dstream的相关源码,这篇和RDD那篇相对来讲都比较基础,主要是对整个流程的梳理,后续会对一些细节的点进行分析。
Matei Zaharia’s paper (paper写的真心好)
http://spark.apache.org/
/lw-lin/CoolplaySpark/tree/master/Spark%20Streaming%20%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90%E7%B3%BB%E5%88%97
23:16:00 lkds 作者:
文章推荐:/readme个人主页:
文章可以转载, 但必须以超链接形式标明文章原始出处和作者信息
已发表评论数()
请填写推刊名
描述不能大于100个字符!
权限设置: 公开
仅自己可见
正文不准确
标题不准确
排版有问题
主题不准确
没有分页内容
图片无法显示
视频无法显示
与原文不一致

我要回帖

更多关于 list分批处理 的文章

 

随机推荐