兄弟连Go语言+区块链技术培训以太坊源码分析(30)eth-bloombits和filter源码分析_第1页
兄弟连Go语言+区块链技术培训以太坊源码分析(30)eth-bloombits和filter源码分析_第2页
兄弟连Go语言+区块链技术培训以太坊源码分析(30)eth-bloombits和filter源码分析_第3页
兄弟连Go语言+区块链技术培训以太坊源码分析(30)eth-bloombits和filter源码分析_第4页
兄弟连Go语言+区块链技术培训以太坊源码分析(30)eth-bloombits和filter源码分析_第5页
已阅读5页,还剩12页未读 继续免费阅读

下载本文档

版权说明:本文档由用户提供并上传,收益归属内容提供方,若内容存在侵权,请进行举报或认领

文档简介

求E兄弟连教育兄弟连Go语言+区块链技术培训以太坊源码分析(30)eth-bloombits和filter源码分析##以太坊的布隆过滤器以太坊的区块头中包含了一个叫做logsBloom的区域。这个区域存储了当前区块中所有的收据的日志的布隆过滤器,一共是2048个bit。也就是256个字节。而我们的一个交易的收据包含了很多的日志记录。每个日志记录包含了合约的地址,多个Topic。而在我们的收据中也存在一个布隆过滤器,这个布隆过滤器记录了所有的日志记录的信息。如果我们看黄皮书里面对日志记录的形式化定义。O代表我们的日志记录,Oa代表logger的地址,Oto,Ot1代表日志的Topics,Od代表时间。Oa是20个字节,Ot是32个字节,Od是很多字节我们定义了一个布隆过滤器函数M,用来把一个日志对象转换成256字节的hashM3:2045是一个特别的函数,用来设置2048个bit位中的三位为1。对于任意的输入值,首先求他的KEC输出,然后通过取KEC输出的[0,1][2,3],[4,5]这几位的值对2048取模,得到三个值,这三个值就是输出的2048中需要置位的下标。也就是说对于任何一个输入,如果它对应的三个下标的值不都为1,那么它肯定不在这个区块中。当如如果对应的三位都为1,也不能说明一定在这个区块中。这就是布隆过滤器的特性。收据中的布隆过滤器就是所有的日志的布隆过滤器输出的并集。同时区块头中的logBloom,就是所有的收据的布隆过滤器的并集。##ChainIndexer和BloomIndexer最开始看到ChainIndexer,不是很明白是什么功能。其实从名字中可以看到,是Chain的索引。在eth中我们有看到BloomIndexer,这个就是布隆过滤器的索引。在我们的协议中提供了查找指定Log的功能。用户可以通过传递下面的参数来查找指定的Log,开始的区块号,结束的区块号,根据合约Addresses指定的地址过滤,根据指定的Topics来过滤。//FilterCriteriarepresentsarequesttocreateanewfilter.typeFilterCriteriastruct{FromBlock*big.IntToBlock*big.IntAddresses[]common.AddressTopics[][]common.Hash}如果开始和结束之间间隔很大,那么如果直接依次检索每个区块头的logBloom区域是比较低效的。因为每个区块头都是分开存储的,可能需要非常多的磁盘随机访问。所以以太坊协议在本地维护了一套索引,用来加速这个过程。大致原理是。每4096个区块称为一个Section,—个Section里面的logBloom会存储在一起。对于每个Section,用一个二维数据,A[2048][4096]来存储。第一维2048代表了bloom过滤器的长度2048个字节。第二维4096代表了一个Section里面的所有区块,每一个位置按照顺序代表了其中的一个区块。A[0][0]=blockchain[section*4096+0].logBloom[0],A[0][1]=blockchain[section*4096+1].logBloom[0],A[0][4096]=blockchain[section*4096+1].logBloom[0],A[1][0]=blockchain[section*4096+0].logBloom[1],A[1][1024]=blockchain[section*4096+1024].logBloom[1],A[2047][1]=blockchain[section*4096+1].logBloom[2047],如果Section填充完毕,那么会写成2048个KV。![image](picture/bloom_6.png)##bloombit.go代码分析这个代码相对不是很独立,如果单独看这个代码,有点摸不着头脑的感觉,因为它只是实现了一些接口,具体的处理逻辑并不在这里,而是在core里面。不过这里我先结合之前讲到的信息分析一下。后续更详细的逻辑在分析core的代码的时候再详细分析。服务线程startBloomHandlers,这个方法是为了响应具体的查询请求,给定指定的Section和bit来从levelDB里面查询然后返回出去。单独看这里有点摸不着头脑。这个方法的调用比较复杂。涉及到core里面的很多逻辑。这里先不细说了。直到有这个方法就行了。typeRetrievalstruct{Bituint //Bit的取值0-2047代表了想要获取哪一位的值Sections[]uint64//那些SectionBitsets[][]byte//返回值查询出来的结果。}//startBloomHandlersstartsabatchofgoroutinestoacceptbloombitdatabase//retrievalsfrompossiblyarangeoffiltersandservingthedatatosatisfy.func(eth*Ethereum)startBloomHandlers(){fori:=0;i<bloomServiceThreads;i++{gofunc(){for{select{case<-eth.shutdownChan:returncaserequest:=<-eth.bloomRequests://request是一个通道task:=<-request//从通道里面获取一个tasktask.Bitsets=make([][]byte,len(task.Sections))fori,section:=rangetask.Sections{head:=core.GetCanonicalHash(eth.chainDb,(section+1)*params.BloomBitsBlocks-1)blob,err:=bitutil.DecompressBytes(core.GetBloomBits(eth.chainDb,task.Bit,section,head),int(params.BloomBitsBlocks)/8)iferr!=nil{

panic(err)}task.Bitsets[i]=blob}request<-task//通过request通道返回结果}}}()}}###数据结构Bloomlndexer对象主要用户构建索引的过程,是core.ChainIndexer的一个接口实现,所以只实现了一些必须的接口。对于创建索引的逻辑还在core.ChainIndexer里面。//BloomIndexerimplementsacore.ChainIndexer,buildinguparotatedbloombitsindex//fortheEthereumheaderbloomfilters,permittingblazingfastfiltering.typeBloomIndexerstruct{sizeuint64//sectionsizetogeneratebloombitsfordbethdb.Database//databaseinstancetowriteindexdataandmetadataintogen*bloombits.Generator//generatortorotatethebloombitscratingthebloomindexsectionuint64//Sectionisthesectionnumberbeingprocessedcurrently当前的sectionheadcommon.Hash//Headisthehashofthelastheaderprocessed}//NewBloomIndexerreturnsachainindexerthatgeneratesbloombitsdataforthe//canonicalchainforfastlogsfiltering.funcNewBloomIndexer(dbethdb.Database,sizeuint64)*core.ChainIndexer{backend:=&BloomIndexer{db:db,size:size,}table:=ethdb.NewTable(db,string(core.BloomBitsIndexPrefix))returncore.NewChainIndexer(db,table,backend,size,bloomConfirms,bloomThrottling,"bloombits")}Reset实现了ChainIndexerBackend的方法,启动一个新的section//Resetimplementscore.ChainIndexerBackend,startinganewbloombitsindex//section.func(b*BloomIndexer)Reset(sectionuint64){gen,err:=bloombits.NewGenerator(uint(b.size))iferr!=nil{panic(err)}b.gen,b.section,b.head=gen,section,common.Hash{}}Process实现了ChainIndexerBackend,增加一个新的区块头到index//Processimplementscore.ChainIndexerBackend,addinganewheader'sbloominto//theindex.func(b*BloomIndexer)Process(header*types.Header){b.gen.AddBloom(uint(header.Number.Uint64()-b.section*b.size),header.Bloom)b.head=header.Hash()}Commit方法实现了ChainIndexerBackend,持久化并写入数据库。//Commitimplementscore.ChainIndexerBackend,finalizingthebloomsectionand//writingitoutintothedatabase.func(b*BloomIndexer)Commit()error{batch:=b.db.NewBatch()fori:=0;i<types.BloomBitLength;i++{bits,err:=b.gen.Bitset(uint(i))iferr!=nil{returnerr}core.WriteBloomBits(batch,uint(i),b.section,b.head,bitutil.CompressBytes(bits))}returnbatch.Write()}##filter/api.go源码分析eth/filter包包含了给用户提供过滤的功能,用户可以通过调用对交易或者区块进行过滤,然后持续的获取结果,如果5分钟没有操作,这个过滤器会被删除。过滤器的结构。var(deadline=5*time.Minute//considerafilterinactiveifithasnotbeenpolledforwithindeadline)//filterisahelperstructthatholdsmetainformationoverthefiltertype//andassociatedsubscriptionintheeventsystem.typefilterstruct{typType //过滤器的类型,过滤什么类型的数据deadline*time.Timer//filterisinactivwhendeadlinetriggers当计时器响起的时候,会触发定时器。hashes[]common.Hash//过滤出来的hash结果critFilterCriteria//过滤条件logs[]*types.Log//过滤出来的Log信息s*Subscription//associatedsubscriptionineventsystem事件系统中的订阅器。}构造方法//PublicFilterAPIofferssupporttocreateandmanagefilters.Thiswillallowexternalclientstoretrievevarious//informationrelatedtotheEthereumprotocolsuchalsblocks,transactionsandlogs.//PublicFilterAPI用来创建和管理过滤器。允许外部的客户端获取以太坊协议的一些信息,比如区块信息,交易信息和日志信息。typePublicFilterAPIstruct{backendBackendmux*event.TypeMuxquitchanstruct{}chainDbethdb.Databaseevents*EventSystemfiltersMusync.Mutexfiltersmap[rpc.ID]*filter}//NewPublicFilterAPIreturnsanewPublicFilterAPIinstance.funcNewPublicFilterAPI(backendBackend,lightModebool)*PublicFilterAPI{api:=&PublicFilterAPI{backend:backend,mux:backend.EventMux(),chainDb:backend.ChainDb(),events:NewEventSystem(backend.EventMux(),backend,lightMode),filters:make(map[rpc.ID]*filter),}goapi.timeoutLoop()returnapi}###超时检查//timeoutLooprunsevery5minutesanddeletesfiltersthathavenotbeenrecentlyused.//Ttisstartedwhentheapiiscreated.//每隔5分钟检查一下。如果过期的过滤器,删除。func(api*PublicFilterAPI)timeoutLoop(){ticker:=time.NewTicker(5*time.Minute)for{<-ticker.Capi.filtersMu.Lock()forid,f:=rangeapi.filters{select{case<-f.deadline.C:f.s.Unsubscribe()delete(api.filters,id)default:continue}}api.filtersMu.Unlock()}}NewPendingTransactionFilter,用来创建一个PendingTransactionFilter。这种方式是用来给那种无法创建长连接的通道使用的(比如HTTP),如果对于可以建立长链接的通道(比如WebSocket)可以使用rpc提供的发送订阅模式来处理,就不用持续的轮询了//NewPendingTransactionFiltercreatesafilterthatfetchespendingtransactionhashes//astransactionsenterthependingstate.////Itispartofthefilterpackagebecausethisfiltercanbeusedthrougthe//'eth_getFilterChanges'pollingmethodthatisalsousedforlogfilters./////ethereum/wiki/wiki/JSON-RPC#eth_newpendingtransactionfilterfunc(api*PublicFilterAPI)NewPendingTransactionFilter()rpc.ID{var(pendingTxs=make(chancommon.Hash)//在事件系统订阅这种消息pendingTxSub=api.events.SubscribePendingTxEvents(pendingTxs))api.filtersMu.Lock()api.filters[pendingTxSub.ID]=&filter{typ:PendingTransactionsSubscription,deadline:time.NewTimer(deadline),hashes:make([]common.Hash,0),s:pendingTxSub}api.filtersMu.Unlock()gofunc(){for{select{caseph:=<-pendingTxs://接收至UpendingTxs,存储在过滤器的hashes容器里面。api.filtersMu.Lock()iff,found:=api.filters[pendingTxSub.ID];found{f.hashes=append(f.hashes,ph)}api.filtersMu.Unlock()case<-pendingTxSub.Err():api.filtersMu.Lock()delete(api.filters,pendingTxSub.ID)api.filtersMu.Unlock()return}}}()returnpendingTxSub.ID}轮询:GetFilterChanges//GetFilterChangesreturnsthelogsforthefilterwiththegivenidsince//lasttimeitwascalled.Thiscanbeusedforpolling.//GetFilterChanges用来返回从上次调用到现在的所有的指定id的所有过滤信息。这个可以用来轮询。//Forpendingtransactionandblockfilterstheresultis[]common.Hash.//(pending)Logfiltersreturn[]Log.//对于pendingtransaction和block的过滤器,返回结果类型是[]common.Hash.对于pendingLog过滤器,返回的是[]Log///ethereum/wiki/wiki/JSON-RPC#eth_getfilterchangesfunc(api*PublicFilterAPI)GetFilterChanges(idrpc.ID)(interface{},error){api.filtersMu.Lock()deferapi.filtersMu.Unlock()iff,found:=api.filters[id];found{if!f.deadline.Stop(){//如果定时器已经触发,但是filter还没有移除,那么我们先接收定时器的值,然后重置定时器//timerexpiredbutfilterisnotyetremovedintimeoutloop//receivetimervalueandresettimer<-f.deadline.C}f.deadline.Reset(deadline)switchf.typ{casePendingTransactionsSubscription,BlocksSubscription:hashes:=f.hashesf.hashes=nilreturnreturnHashes(hashes),nilcaseLogsSubscription:logs:=f.logsf.logs=nilreturnreturnLogs(logs),nil}return[]interface{}{},fmt.Errorf("filternotfound")}对于可以建立长连接的通道,可以直接使用rpc的发送订阅模式,这样客户端就可以直接接收到过滤信息,不用调用轮询的方式了。可以看到这种模式下面并没有添加到filters这个容器,也没有超时管理了。也就是说支持两种模式。//NewPendingTransactionscreatesasubscriptionthatistriggeredeachtimeatransaction//entersthetransactionpoolandwassignedfromoneofthetransactionsthisnodesmanages.func(api*PublicFilterAPI)NewPendingTransactions(ctxcontext.Context)(*rpc.Subscription,error){notifier,supported:=rpc.NotifierFromContext(ctx)if!supported{return&rpc.Subscription{},rpc.ErrNotificationsUnsupported}rpcSub:=notifier.CreateSubscription()gofunc(){txHashes:=make(chancommon.Hash)pendingTxSub:=api.events.SubscribePendingTxEvents(txHashes)for{select{caseh:=<-txHashes:notifier.Notify(rpcSub.ID,h)case<-rpcSub.Err():pendingTxSub.Unsubscribe()returncase<-notifier.Closed():pendingTxSub.Unsubscribe()return}}}()returnrpcSub,nil日志过滤功能,根据FilterCriteria指定的参数,来对日志进行过滤,开始区块,结束区块,地址和Topics,这里面引入了一个新的对象filter//FilterCriteriarepresentsarequesttocreateanewfilter.typeFilterCriteriastruct{FromBlock*big.IntToBlock*big.IntAddresses[]common.AddressTopics[][]common.Hash}//GetLogsreturnslogsmatchingthegivenargumentthatarestoredwithinthestate./////ethereum/wiki/wiki/JSON-RPC#eth_getlogsfunc(api*PublicFilterAPI)GetLogs(ctxcontext.Context,critFilterCriteria)([]*types.Log,error){//ConverttheRPCblocknumbersintointernalrepresentationsifcrit.FromBlock==nil{crit.FromBlock=big.NewInt(rpc.LatestBlockNumber.Int64())}ifcrit.ToBlock==nil{crit.ToBlock=big.NewInt(rpc.LatestBlockNumber.Int64())}//Createandrunthefiltertogetallthelogs//创建了一个Filter对象然后调用filter.Logsfilter:=New(api.backend,crit.FromBlock.Int64(),crit.ToBlock.Int64(),crit.Addresses,crit.Topics)logs,err:=filter.Logs(ctx)iferr!=nil{returnnil,err}returnreturnLogs(logs),err}##filter.gofiter.go里面定义了一个Filter对象。这个对象主要用来根据区块的BloomIndexer和布隆过滤器等来执行日志的过滤功能。###数据结构//后端,这个后端其实是在core里面实现的。布隆过滤器的主要算法在core里面实现了。typeBackendinterface{ChainDb()ethdb.DatabaseEventMux()*event.TypeMuxHeaderByNumber(ctxcontext.Context,blockNrrpc.BlockNumber)(*types.Header,error)GetReceipts(ctxcontext.Context,blockHashcommon.Hash)(types.Receipts,error)SubscribeTxPreEvent(chan<-core.TxPreEvent)event.SubscriptionSubscribeChainEvent(chchan<-core.ChainEvent)event.SubscriptionSubscribeRemovedLogsEvent(chchan<-core.RemovedLogsEvent)event.SubscriptionSubscribeLogsEvent(chchan<-[]*types.Log)event.SubscriptionBloomStatus()(uint64,uint64)ServiceFilter(ctxcontext.Context,session*bloombits.MatcherSession)}//Filtercanbeusedtoretrieveandfilterlogs.typeFilterstruct{backendBackend //后端dbethdb.Database//数据库begin,endint64 //开始结束区块addresses[]common.Address//筛选地址topics[][]common.Hash//筛选主题matcher*bloombits.Matcher//布隆过滤器的匹配器}构造函数把address和topic都加入到filters容器。然后构建了一个bloombits.NewMatcher(size,filters)。这个函数在core里面实现,暂时不会讲解。//Newcreatesanewfilterwhichusesabloomfilteronblockstofigureoutwhether//aparticularblockisinterestingornot.funcNew(backendBackend,begin,endint64,addresses[]common.Address,topics[][]common.Hash)*Filter{//Flattentheaddressandtopicfilterclausesintoasinglebloombitsfilter//system.Sincethebloombitsarenotpositional,niltopicsarepermitted,//whichgetflattenedintoanilbyteslice.varfilters[][][]byteiflen(addresses)>0{filter:=make([][]byte,len(addresses))fori,address:=rangeaddresses{filter[i]=address.Bytes()}filters=append(filters,filter)}for_,topicList:=rangetopics{filter:=make([][]byte,len(topicList))fori,topic:=rangetopicList{filter[i]=topic.Bytes()}filters=append(filters,filter)}//Assembleandreturnthefiltersize,_:=backend.BloomStatus()return&Filter{backend:backend,begin:begin,end:end,addresses:addresses,topics:topics,db:backend.ChainDb(),matcher:bloombits.NewMatcher(size,filters),}}Logs执行过滤//Logssearchestheblockchainformatchinglogentries,returningallfromthe//firstblockthatcontainsmatches,updatingthestartofthefilteraccordingly.func(f*Filter)Logs(ctxcontext.Context)([]*types.Log,error){//Figureoutthelimitsofthefilterrangeheader,_:=f.backend.HeaderByNumber(ctx,rpc.LatestBlockNumber)ifheader==nil{returnnil,nil}head:=header.Number.Uint64()iff.begin==-1{f.begin=int64(head)}end:=uint64(f.end)iff.end==-1{end=head}//Gatherallindexedlogs,andfinishwithnonindexedonesvar(logs[]*types.Logerrerror)size,sections:=f.backend.BloomStatus()//indexed是指创建了索引的区块的最大值。如果过滤的范围落在了创建了索引的部分。//那么执行索引搜索。ifindexed:=sections*size;indexed>uint64(f.begin){ifindexed>end{logs,err=f.indexedLogs(ctx,end)}else{logs,err=f.indexedLogs(ctx,indexed-1)}iferr!=nil{returnlogs,err}}//对于剩下的部分执行非索引的搜索。rest,err:=f.unindexedLogs(ctx,end)logs=append(logs,rest...)returnlogs,err}索引搜索//indexedLogsreturnsthelogsmatchingthefiltercriteriabasedonthebloom//bitsindexedavailablelocallyorviathenetwork.func(f*Filter)indexedLogs(ctxcontext.Context,enduint64)([]*types.Log,error){//Createamatchersessionandrequestservicingfromthebackendmatches:=make(chanuint64,64)//启动matchersession,err:=f.matcher.Start(uint64(f.begin),end,matches)iferr!=nil{returnnil,err}defersession.Close(time.Second)//进行过滤服务。这些都在core里面。后续分析core的代码会进行分析。f.backend.ServiceFilter(ctx,session)//Iterateoverthematchesuntilexhaustedorcontextclosedvarlogs[]*types.Logfor{select{casenumber,ok:=<-matches://Abortifallmatcheshavebeenfulfilledif!ok{//没有接收到值并且channel已经被关闭f.begin=int64(end)+1//更新begin。以便于下面的非索引搜索returnlogs,nil}//Retrievethesuggestedblockandpullanytrulymatchinglogsheader,err:=f.backend.HeaderByNumber(ctx,rpc.BlockNumber(number))ifheader==nil||err!=nil{returnlogs,err}found,err:=f.checkMatches(ctx,header)//查找匹配的值iferr!=nil{returnlogs,err}logs=append(logs,found...)case<-ctx.Done():returnlogs,ctx.Err()}}}checkMatches,拿到所有的收据,并从收据中拿到所有的日志。执行filterLogs方法。//checkMatcheschecksifthereceiptsbelongingtothegivenheadercontainanylogeventsthat//matchthefiltercriteria.Thisfunctioniscalledwhenthebloomfiltersignalsapotentialmatch.func(f*Filter)checkMatches(ctxcontext.Context,header*types.Header)(logs[]*types.Log,errerror){//Getthelogsoftheblockreceipts,err:=f.backend.GetReceipts(ctx,header.Hash())iferr!=nil{returnnil,err}varunfiltered[]*types.Logfor_,receipt:=rangereceipts{unfiltered=append(unfiltered,([]*types.Log)(receipt.Logs)...)}logs=filterLogs(

温馨提示

  • 1. 本站所有资源如无特殊说明,都需要本地电脑安装OFFICE2007和PDF阅读器。图纸软件为CAD,CAXA,PROE,UG,SolidWorks等.压缩文件请下载最新的WinRAR软件解压。
  • 2. 本站的文档不包含任何第三方提供的附件图纸等,如果需要附件,请联系上传者。文件的所有权益归上传用户所有。
  • 3. 本站RAR压缩包中若带图纸,网页内容里面会有图纸预览,若没有图纸预览就没有图纸。
  • 4. 未经权益所有人同意不得将文件中的内容挪作商业或盈利用途。
  • 5. 人人文库网仅提供信息存储空间,仅对用户上传内容的表现方式做保护处理,对用户上传分享的文档内容本身不做任何修改或编辑,并不能对任何下载内容负责。
  • 6. 下载文件中如有侵权或不适当内容,请与我们联系,我们立即纠正。
  • 7. 本站不保证下载资源的准确性、安全性和完整性, 同时也不承担用户因使用这些下载资源对自己和他人造成任何形式的伤害或损失。

评论

0/150

提交评论