超级账本 hyperledgerr fabric怎么记录交易

Blockchian区块链:IBM HyperLedger fabric 简述
在我看来,比特币就是现实中的V字仇杀队,当然现实是更残酷的世界政府,这场博弈关乎着人类文明、政治、社会属性、经济和人权。
IBM HyperLeger 又叫 fabric,你可以把它想象成一个由全社会来共同维护的一个超级账本,没有中心机构拥揽权力,你的每一笔交易都是全网公开且安全的,信用由全社会共同见证。它与Bitcoin的关系就是,你可以利用fabric构建出一个叫Bitcoin的应用来帮助你change the world。
愿景是那么的牛X,貌似正合我们想改变世界的胃口,但是在残酷的现实和世界面前我们永远是天真幼稚的,blockchain需要一步一步脚印来构建它的宏伟蓝图,起码目前是没有将它用于工业生产和国家经济的案例的。
fabric源于IBM,初衷为了服务于工业生产,IBM将44,000行代码开源,是了不起的贡献,让我们可以有机会如此近的去探究区块链的原理,但毕竟IBM是从自身利益和客户利益出发的,并不是毫无目的的去做这项公益事业,我们在看fabric的同时要有一种审慎的思维:区块链不一定非得这样,它跟比特币最本质的非技术区别在哪里。我们先来大致了解一下fabric的关键术语(因为一些词汇用英文更准确,我就不硬翻译了)。
1. Terminology
Transaction它一条request,用来在ledger上执行一个function,这个function是用chaincode来实现的
Transactor发出transaction的实体,比如它可能是一个客户端应用
LedgerLegder可以理解为一串经过的block链条,每一个block包含着transactions和当前world state等信息
World Stateworld state是一组变量的集合,包含着transactions的执行结果
Chaincode这是一段应用层面的代码(又叫smart contract,智能合约),它存储在ledger上,作为transaction的一部分。也就是说chaincode来运行transaction,然后运行结果可能会修改world state
Validating Peer参与者之一,它是一种在网络里负责执行一致性协议、确认交易和维护账本的计算机节点
Nonvalidating Peer它相当于一个代理节点,用来连接transactor和邻近的VP(Validating Peer)节点。一个NVP节点不会去执行transactions但是回去验证它们。同时它也会承担起事件流server和提供REST services的角色
Permissioned Ledger这是一个要求每一个实体和节点都要成为网络成员的blockchain网络,所有匿名节点都不被允许连接
Privacy用来保护和隐蔽chain transactors的身份,当网络成员要检查交易时,如果没有特权的话,是无法追踪到交易的transactor
Confidentiality这个特性使得交易内容不是对所有人可见,只开放给利益相关者
Auditability将blockchain用于商业用途需要遵守规则,方便监管者调查交易记录
2. Architecture
核心逻辑有三条:Membership、Blockchain和Chaincode。
2.1 Membership Services
这项服务用来管理节点身份、隐私、confidentiality 和 auditability。在一个 non-permissioned的区块链网络里,参与者不要求授权,所有的节点被视作一样,都可以去submit一个transaction,去把这些交易存到区块(blocks)中。那Membership Service是要将一个 non-permissioned的区块链网络变成一个permissioned的区块链网络,凭借着Public Key Infrastructure (PKI)、去中心和一致性。
2.2 Blockchain Services
Blockchain services使用建立在HTTP/2上的P2P协议来管理分布式账本。提供最有效的哈希算法来维护world state的副本。采取可插拔的方式来根据具体需求来设置共识协议,比如PBFT,Raft,PoW和PoS等等。
2.3 Chaincode Services
Chaincode services 会提供一种安全且轻量级的沙盒运行模式,来在VP节点上执行chaincode逻辑。这里使用环境,里面的base镜像都是经过签名验证的安全镜像,包括OS层和开发chaincode的语言、runtime和SDK层,目前支持Go、Jave和Nodejs开发语言。
2.4 Events
在blockchain网络里,VP节点和chaincode会发送events来触发一些监听动作。比如chaincode是用户代码,它可以产生用户事件。
2.5 API 和 CLI
提供REST API,允许注册用户、查询blockchain和发送transactions。一些针对chaincode的API,可以用来执行transactions和查询交易结果。对于开发者,可以通过CLI快速去测试chaincode,或者去查询交易状态。
3. Topology
分布式网络的拓扑结构是非常值得研究的。在这个世界里散布着众多参与者,不同角色,不同利益体,各种各样的情况处理象征着分布式网络里的规则和法律,无规则不成方圆。在区块链网络里,有Membership service,有VP节点,NVP节点,一个或多个应用,它们形成一个chain,然后会有多个chain,每一个chain都有各自的安全要求和操作需求。
3.1 单个VP节点网络
最简单的网络就是只包含一个VP节点,因此就省去了共识部分。
3.2 多个VP节点网络
多个VP和NVP参与的网络才是有价值和实际意义的。NVP节点分担VP节点的工作压力,承担处理API请求和events的工作。
而对于VP节点,VP节点间会组成一个网状网络来传播信息。一个NVP节点如果被允许的话可以与邻近的一个VP节点相连。NVP节点是可以省略的,如果Application可以直接和VP节点通讯。
3.3 Multichain
还会存在一个网络里多条chain的情况,各个chain的意图不一样。
4. Protocol
fabric是用gRPC来做P2P通讯的,是一个双向流消息传递。使用Protocol Buffer来序列化要传递的数据结构。
4.1 Message
message分四种:Discovery,Transaction,Synchronization 和 Consensus。每一种信息下还会包含更多的子信息,由payload指出。
payload是一个不透明的字节数组,它包含着一些对象,比如 Transaction 或者 Response。例如,如果 type 是 CHAIN_TRANSACTION,那么 payload 就是一个 Transaction的对象。
message Message {
enum Type {
UNDEFINED = 0;
DISC_HELLO = 1;
DISC_DISCONNECT = 2;
DISC_GET_PEERS = 3;
DISC_PEERS = 4;
DISC_NEWMSG = 5;
CHAIN_STATUS = 6;
CHAIN_TRANSACTION = 7;
CHAIN_GET_TRANSACTIONS = 8;
CHAIN_QUERY = 9;
SYNC_GET_BLOCKS = 11;
SYNC_BLOCKS = 12;
SYNC_BLOCK_ADDED = 13;
SYNC_STATE_GET_SNAPSHOT = 14;
SYNC_STATE_SNAPSHOT = 15;
SYNC_STATE_GET_DELTAS = 16;
SYNC_STATE_DELTAS = 17;
RESPONSE = 20;
CONSENSUS = 21;
Type type = 1;
bytes payload = 2;
google.protobuf.Timestamp timestamp = 3;
4.1.1 Discovery Messages
一个新启动的节点,如果CORE_PEER_DISCOVERY_ROOTNODE(ROOTNODE是指网络中其它任意一个节点的IP)被指定了,它就会开始运行discovery协议。而ROOTNODE就作为最一开始的发现节点,然后通过ROOTNODE节点进而发现全网中所有的节点。discovery协议信息是DISC_HELLO,它的payload是一个HelloMessage对象,同时包含信息发送节点的信息:
message HelloMessage {
PeerEndpoint peerEndpoint = 1;
uint64 blockNumber = 2;
message PeerEndpoint {
PeerID ID = 1;
string address = 2;
enum Type {
UNDEFINED = 0;
VALIDATOR = 1;
NON_VALIDATOR = 2;
Type type = 3;
bytes pkiID = 4;
message PeerID {
string name = 1;
在启动之初定义的或者在配置文件中定义的该节点的名字
PeerEndpoint
描述该节点,并判断是否是NVP和VP节点
该节点的加密ID
blockNumber
该节点目前拥有的blockchain的高度
如果一个节点接收到DISC_HELLO信息,发现里面的block height高于自己目前的block height,它会立即发送一个同步协议来与全网同步自己的状态(mark:但是在层面似乎并没有实现同步这个逻辑)。
在这个刚加入节点完成DISC_HELLO这轮消息传递后,接下来回周期性的发送DISC_GET_PEERS来发现其它加入网络中的节点。为了回复DISC_GET_PEERS,一个节点会发送DISC_PEERS。
4.1.2 Synchronization Messages
Synchronization 协议是接着上面所说的discovery协议开始的,当一个节点发现它的block的状态跟其它节点不一致时,就会触发同步。该节点会广播(broadcast)三种信息:SYNC_GET_BLOCKS , SYNC_STATE_GET_SNAPSHOT 或者
SYNC_STATE_GET_DELTAS,同时对应接收到三种信息:SYNC_BLOCKS , SYNC_STATE_SNAPSHOT 或者 SYNC_STATE_DELTAS。
目前fabric嵌入的共识算法是pbft。
SYNC_GET_BLOCKS会请求一系列连续的block,发送的数据结构中payload将是一个SyncBlockRange对象。
message SyncBlockRange {
uint64 correlationId = 1;
uint64 start = 2;
uint64 end = 3;
接收的节点会回复SYNC_BLOCKS,它的payload是一个SyncBlocks对象:
message SyncBlocks {
SyncBlockRange range = 1;
repeated Block blocks = 2;
start和end表示起始和结束的block。例如start=3, end=5,代表了block 3,4,5;start=5, end=3,代表了block 5,4,3。
SYNC_STATE_GET_SNAPSHOT会请求当前world state的一个snapshot,该信息的payload是一个SyncStateSnapshotRequest对象:
message SyncStateSnapshotRequest {
uint64 correlationId = 1;
correlationId是发出请求的peer用来追踪对应的该信息的回复。收到该消息的peer会回复SYNC_STATE_SNAPSHOT,它的payload是一个SyncStateSnapshot对象:
message SyncStateSnapshot {
bytes delta = 1;
uint64 sequence = 2;
uint64 blockNumber = 3;
SyncStateSnapshotRequest request = 4;
SYNC_STATE_GET_DELTAS默认Ledger会包含500个transition deltas。delta(j)表示block(i)和block(j)之间的状态转变(i = j -1)。
4.1.3 Consensus Messages
Consensus framework会将接收到的CHAIN_TRANSACTION转变成CONSENSUS,然后广播给所有的VP节点。
4.1.4 Transaction Messages
在fabric中的交易有三种:Deploy, Invoke 和 Query。Deploy将指定的chaincode安装到chain上,Invoke和Query会调用已经部署好的chaincode的函数。
4.2 Ledger
Ledger主要包含两块:blockchain和world state。blockchain就是一系列连在一起的block,用来记录历史交易。world state是一个key-value,当交易执行后,chaincode会将state存在里面。
4.2.1 Blockchain
blockchain是指由一些block连成的list,每一个block都包含上一个block的hash。一个block还会包含一些交易列表以及执行所有这些交易后world state的一个hash。
message Block {
version = 1;
google.protobuf.Timestamp timestamp = 2;
bytes transactionsHash = 3;
bytes stateHash = 4;
bytes previousBlockHash = 5;
bytes consensusMetadata = 6;
NonHashData nonHashData = 7;
message BlockTransactions {
repeated Transaction transactions = 1;
那上一个block的hash是如何计算的呢:
用 protocol buffer 序列化block的信息
用 SHA3 SHAKE256 算法将序列化后的block信息哈希成一个512字节的输出
上面的数据结构中有一个 transactionHash, 它是transaction merkle tree的根节点(用默克尔树来描述这些交易)。
4.2.2 World State
一个peer的world state是所有部署的chaincodes的状态(state)的集合。一个chaincode的状态由键值对(key-value)的集合来描述。我们期望网络里的节点拥有一致的world state,所以会通过计算world state的 crypto-hash 来进行比较,但是将会消耗比较昂贵的算力,为此我们需要设计一个高效率的计算方法。比如引入Bucket-tree来实现world state的组织。
world state中的key的表示为{chaincodeID, ckey},我们可以这样来描述key, key = chaincodeID+nil+cKey。
world state的key-value会存到一个hash表中,这个hash表有预先定义好数量(numBuckets)的buckets组成。一个 hash function 会来定义哪个桶包含哪个key。这些buckets都将作为merkle-tree的叶子节点,编号最小的bucket作为这个merkle-tree最左面的叶子节点。倒数第二层的构建,从左开始每maxGroupingAtEachLevel(预先定义好数量)这么多的叶子节点为一组聚在一起,形成N组,每一组都会插入一个节点作为所包含叶子节点的父节点,这样就形成了倒数第二层。要注意的是,最末层的父节点(就是刚刚描述的插入的节点)可能会有少于maxGroupingAtEachLevel的孩子节点。按照这样的方法不断构建更高一层,直到根节点被构建出来。
举一个例子,{numBuckets=10009 and maxGroupingAtEachLevel=10},它形成的tree的每一次包含的节点数目如下:
Number of nodes
4.3 Consensus Framework
consensus framework包含了三个package:consensus、controller和helper。
<municator用来发送消息给其他的VP节点
onsensus.Executor用于交易的启动、执行和回滚,还有preview、commit
controller指定被VP节点使用的consensus plugin
helper用来帮助consensus plugin与stack交互,例如维护message handler
目前有两个consensus plugin:pbft和noops。
pbft是 微软论文PBFT共识算法的一个实现。
noops 用于开发和测试,它没有共识机制,但是会处理所有consensus message,所以如果要开发自己的consensus plugin,从它开始吧。
5. Implementation and contribution
Implement SYNC_BLOCK_ADDED handler
我的一个同事实现了SYNC_BLOCK_ADDED消息的handler,这样在noops共识模式下,当一个block被加到(mined/added)ledger时,NVP节点就可以处理这条消息了,并将最新加入的block存在它自己的ledger中。
SYNC_BLOCK_ADDED message 对应的callback是beforeBlockAdded(core/peer/handler.go),官方代码如下:
func (d *Handler) beforeBlockAdded(e *fsm.Event) {
peerLogger.Debugf(&Received message: %s&, e.Event)
msg, ok := e.Args[0].(*pb.Message)
e.Cancel(fmt.Errorf(&Received unexpected message type&))
// Add the block and any delta state to the ledger
这里并没有去获取和处理block的信息,我们需要加入如下:
if ValidatorEnabled() {
e.Cancel(fmt.Errorf(&VP shouldn&#39;t receive SYNC_BLOCK_ADDED&))
// Add the block and any delta state to the ledger
blockState := &pb.BlockState{}
err := proto.Unmarshal(msg.Payload, blockState)
if err != nil {
e.Cancel(fmt.Errorf(&Error unmarshalling BlockState: %s&, err))
coord := d.Coordinator
blockHeight := coord.GetBlockchainSize()
if blockHeight &= 0 {
e.Cancel(fmt.Errorf(&No genesis block is made&))
curBlock, err := coord.GetBlockByNumber(blockHeight -1)
if err != nil {
e.Cancel(fmt.Errorf(&Error fetching block #%d, %s&, blockHeight -1, err))
hash, err := curBlock.GetHash()
if err != nil {
e.Cancel(fmt.Errorf(&Error hashing latest block&))
if bytes.Compare(hash, blockState.Block.PreviousBlockHash) != 0 {
e.Cancel(fmt.Errorf(&PreviousBlockHash of received block doesnot match hash of current block&))
coord.PutBlock(blockHeight, blockState.Block)
delta := &statemgmt.StateDelta{}
if err := delta.Unmarshal(blockState.StateDelta)
e.Cancel(fmt.Errorf(&Received a corrupt state delta&))
coord.ApplyStateDelta(msg, delta)
if coord.CommitStateDelta(msg) != nil {
e.Cancel(fmt.Errorf(&Played state forward, hashes matched, but failed to commit, invalidated state&))
peerLogger.Infof(&Blockchain height grows into %d&, coord.GetBlockchainSize())
Enable statetransfer for HELLO message
我们还发现当一个NVP节点刚加入网络时,它会发送一个DISC_HELLO message,随后从其他节点接收一个包含那个节点的blockchain信息的DISC_HELLO message,不过官方代码并没有给出NVP依据这些返回信息同步自己状态的实现。NVP正在网络中实施自己的状态同步时,一个新的block被mine,NVP却不能把这个新的block加入到自己的chain中。所以目前就出现了一个棘手的情况:当新的NVP节点刚加入网络时,通过HELLO message获取其他节点的blockchain信息开始同步自己的状态,这肯定需要一定的时间来完成,但与此同时,网络里的交易还在继续,新的block会被不断的mined,虽然NVP能接收到SYNC_BLOCK_ADDED,并拥有处理它的handler,但是这时候却不能将新的block信息加入到自己的chain中,因为hash不匹配,毕竟NVP节点并没有完成一开始的同步。本文介绍了在Hyperledger中数据存取的实现.
Hyperledger提供基于key/value的数据存储,其中key是字符串,value则是二进制字节数组,Hyperledger的Go API提供了三个方法用于数据存取:PutState(key, value)用于向Hyperledger中存储数据, GetState(key)用于从Hyperledger中提取数据,而DelState(key)则从Hyperledger中删除数据。
数据存取 Chaincode 示例
以下是一个简单的数据存取Chaincode, 以及其相应的REST请求。
package main
"/hyperledger/fabric/core/chaincode/shim"
type SaveState1Chaincode struct {
func (t *SaveState1Chaincode) Init(stub shim.ChaincodeStubInterface, function string, args []string) ([]byte, error) {
fmt.Printf("Init called with function %s!\n", function)
return nil, nil
func (t *SaveState1Chaincode) Invoke(stub shim.ChaincodeStubInterface, function string, args []string) ([]byte, error) {
fmt.Printf("Invoke called with function %s!\n", function)
var key, value string
key = args[0]
value = args[1]
var err error
err = stub.PutState(key, []byte(value))
if err != nil {
return nil, err
return nil, nil
func (t *SaveState1Chaincode) Query(stub shim.ChaincodeStubInterface, function string, args []string) ([]byte, error) {
fmt.Printf("Query called with function %s!\n", function)
var key string
key = args[0]
valInBytes, err := stub.GetState(key)
if err != nil {
return nil, errors.New("Failed to get state for " + key)
message := "State for "
+ key + " = " + string(valInBytes)
return []byte(message),
func main() {
err := shim.Start(new(SaveState1Chaincode))
if err != nil {
fmt.Printf("Error starting Save State chaincode: %s", err)
存储数据的REST请求
"jsonrpc": "2.0",
"method": "invoke",
"params": {
"type": 1,
"chaincodeID":{
"name":"mycc"
"ctorMsg": {
"function":"invoke",
"args":["testKey", "testValue"]
"secureContext": "jim"
获取数据的REST请求
"jsonrpc": "2.0",
"method": "query",
"params": {
"type": 1,
"chaincodeID":{
"name":"mycc"
"ctorMsg": {
"function":"query",
"args":["testKey"]
"secureContext": "jim"
关于Immutability
以上代码也可以看出Hyperledger和BitCoin和Ethereum等区块链对Immutability的不同理解, 在Hyperledger中,数据提交到区块链后不仅可以改变,还甚至可以被删除,而在BitCoin和Ethereum中数据一旦提交到区块链后就不能再被改变。
这也体现在R3的Corda区块链中,R3 CTO Richard Gendal Brown在 写道:
Immutability
The fourth feature in the “Blockchain Bundle” is often, if misleadingly, termed “immutability”: data, once committed, cannot be changed.
This isn’t quite true: if I have a piece of data then of course I can change it. What we actually mean is that: once committed, nobody else will accept a transaction from me if it tries to build on a modified version of some data that has already been accepted by other stakeholders.
Blockchains achieve this by having transactions commit to the outputs of previous transactions and have blocks commit to the content of previous blocks. Each new step can only be valid if it really does build upon an unchangeable body of previous activity.
本文介绍了在Hyperledger中数据存取的实现以及关于Immutability的讨论.
阅读(...) 评论()4403人阅读
fabric源码(6)
先前分析程序着&#30524;于细节分析,这样没有框架的概念,花了两天时间分析整理了一下hyperledger fabric的架构设计,分析该程序没有参照任何资料,如有错误欢迎指正,共同进步。
笔者在详细分析程序前有以下疑问:
1)CLI(命令行)客户端如何发送命令给Peer节点
2)本Peer节点如何接收其他节点的数据,接收到数据又如何处理,处理的方式和1又有什么区别
3)数据是何时又是如何被送入consensus模块
4)consensus模块内部又是如何架构的 为什么看起来helper executor pbft controller文件夹交至在一起,保存各自句柄,相互调用
5)ChainCode(链码,简称CC)是如何接收到Peer对其的操作、访问的
6)ChainCode是如何调用fabric API来查询写入数据的
7)在阅读源码初始化过程中,Peer节点会创建大量Server,这些Server后续过程我们是如何使用的
注:本人对于数据库、Docker相关知识不是很了解,尽量避免关于这两个部分的介绍以免错误的引导读者。
下面会慢慢的渗透以上涉及的问题。
每个Server作用:
AdminServer:控制该节点的命运,可以删除该节点所在的进程。(Start Stop GetStatus )
EventHubServer:Peer节点支持客户端对指定事件进行监听,例如Rejection等。客户端需要先注册自己关心的Events,当事件发生时trigger 监听者。
OpenChainServer:对外提供ledger的访问接口,涉及GetBlockchainInfo&GetBlockByNumber等。
DevopsServer:负责与CLI Client对接,外部进行CC操作的入口,Deploy invoke query。
ChaincodeSupportServer:负责与shim/Chaincode通信,ChainCode的所有调用接收发送都要与该Server信息交互。
PeerServer:该Server是一个Engine,Engine关联了内部消息响应实现,同时为周围Peer节点创建Client与之通信。
RESTServer:该Server没有进行分析,应该是REST接口&#26684;式相关。
一级模块分类:
之前创建服务器与之对应的客户端,可以理解成其他节点或者CLI client等。
中间层,Server与Client端 API接口定义
ServerProcess:服务响应处理函数,包括各类型的HandleMessage。
Consensus:
共识模块,目前采用的是PBFT NOOPS
ChainCode Shim:代码中shim和我理解的不一致,将ChainCodeSupport也应该算到shim,该模块的作用是连接Peer节点与ChainCode的媒介,用shim形容也可。
ChainCode:
链码,应用(例如智能合约)。
数据存储。
代码里有一个叫做Vendor的文件夹,该文件夹里涉及的功能模块自成一体,例如grpcServer等
ChainCode里面会调用Peer节点信息。
伴随着数据加解密。&
账本操作。
该代码使用Handler触发模式,在跟踪代码程序时要注意handler对象赋&#20540;位置,否则容易找错HandleMessage,这些Handler处理函数命名基本相同,容易操作混乱。
下面分析几个读者应该最关心的流程:
1)Client通过CLI执行一条invoke命令
2)某节点发送给该节点ViewChange命令
3)ChainCode调用API putStatus
4)Consensus流程
一、 Client通过CLI执行一条invoke命令
1)在Peer节点初始化的时候 创建DevopsServer
serverDevops := core.NewDevopsServer(peerServer)
pb.RegisterDevopsServer(grpcServer, serverDevops)
2)DevopsServer设置Service规范,例如Invoke
Message,调用_Devops_Invoke_Handler函数
var _Devops_serviceDesc = grpc.ServiceDesc{
ServiceName: &protos.Devops&,
HandlerType: (*DevopsServer)(nil),
Methods: []grpc.MethodDesc{
MethodName: &Login&,
_Devops_Login_Handler,
MethodName: &Build&,
_Devops_Build_Handler,
MethodName: &Deploy&,
_Devops_Deploy_Handler,
MethodName: &Invoke&,
_Devops_Invoke_Handler,
MethodName: &Query&,
_Devops_Query_Handler,
MethodName: &EXP_GetApplicationTCert&,
_Devops_EXP_GetApplicationTCert_Handler,
MethodName: &EXP_PrepareForTx&,
_Devops_EXP_PrepareForTx_Handler,
MethodName: &EXP_ProduceSigma&,
_Devops_EXP_ProduceSigma_Handler,
MethodName: &EXP_ExecuteWithBinding&,
_Devops_EXP_ExecuteWithBinding_Handler,
Streams: []grpc.StreamDesc{},
}3)其中_Devops_Invoke_Handler函数在Protos模块,其负责将Client接入的信息传递到对应的Server模块func _Devops_Invoke_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) {
in := new(ChaincodeInvocationSpec)
if err := dec(in); err != nil {
return nil, err
out, err := srv.(DevopsServer).Invoke(ctx, in)
if err != nil {
return nil, err
return out, nil
4)在函数在devops服务端代码中处理
func (d *Devops) Invoke(ctx context.Context, chaincodeInvocationSpec *pb.ChaincodeInvocationSpec) (*pb.Response, error) {
return d.invokeOrQuery(ctx, chaincodeInvocationSpec, chaincodeInvocationSpec.ChaincodeSpec.Attributes, true)
5)精简invokeOrQuery代码,d.coord 是PeerServer对象,ExecuteTransaction 是对应Engine的实现方法
func (d *Devops) invokeOrQuery(ctx context.Context, chaincodeInvocationSpec *pb.ChaincodeInvocationSpec, attributes []string, invoke bool) (*pb.Response, error) {
resp := d.coord.ExecuteTransaction(transaction)
6)本次请求被封装成交易Struct,该处理是在PeerServer中。
func (p *Impl) ExecuteTransaction(transaction *pb.Transaction) (response *pb.Response) {
if p.isValidator {
response = p.sendTransactionsToLocalEngine(transaction)
peerAddresses := p.discHelper.GetRandomNodes(1)
response = p.SendTransactionsToPeer(peerAddresses[0], transaction)
return response
7)思考可知,最终这笔transaction是要交给到Consensus进行处理,那么如何传递的呢?就在下面p.engine.ProcessTransactionMsg,其中&p&代指PeerServer,engine是在创建PeerServer的时候指定的Engine,而这个Engine的handler实现在Consensus里,在实现EngineHandler过程中加载了PBFT算法。所以ProcessTransactionMsg函数的实现在consensus模块engine代码里。这样解决了开始时提出的疑问3)。
func (p *Impl) sendTransactionsToLocalEngine(transaction *pb.Transaction) *pb.Response {
peerLogger.Debugf(&Marshalling transaction %s to send to local engine&, transaction.Type)
data, err := proto.Marshal(transaction)
if err != nil {
return &pb.Response{Status: pb.Response_FAILURE, Msg: []byte(fmt.Sprintf(&Error sending transaction to local engine: %s&, err))}
var response *pb.Response
msg := &pb.Message{Type: pb.Message_CHAIN_TRANSACTION, Payload: data, Timestamp: util.CreateUtcTimestamp()}
peerLogger.Debugf(&Sending message %s with timestamp %v to local engine&, msg.Type, msg.Timestamp)
response = p.engine.ProcessTransactionMsg(msg, transaction)
return response
8)从这里开始进入了consensus内部处理,在这里Consensus模块是单独分析。
func (eng *EngineImpl) ProcessTransactionMsg(msg *pb.Message, tx *pb.Transaction) (response *pb.Response) {
err := eng.consenter.RecvMsg(msg, eng.peerEndpoint.ID)
画图说明上述流程:
该图中没有体现的一点是在Devops Server创建的时候将PeerServer对象作为构造参数传入,而PeerServer创建的过程就是创建Engine的过程,也是加载Engine-handler的过程,而Engine-handler的实现在Consensus模块。图中直接从Devops Server 跳入Consensus模块有些突兀。
参考知识库
* 以上用户言论只代表其个人观点,不代表CSDN网站的观点或立场
访问:9362次
排名:千里之外

我要回帖

更多关于 ibm hyperledger 的文章

 

随机推荐