如何使用rust-libp2p实现区块同步?

寄意兰舟 发布在 海盗号 87663

去年年初,很多人都说rust开发区块链如何好,然后就学习了一下。最先接触到的是substrate里面的网络模块,当时对libp2p不是很了解,rust语法也一直半解,以致只能看懂应用消息的转发流程,如何发到对方节点就不是很清楚,年初学习了下eth 2.0客户端Lighthouse,里面宏比较少结构相对清晰很多。

本文讨论的节点同步有历史块同步、新区块缓存,可参考以太坊download,fetcher模块。不涉及节点发现,加密连接。

 

历史块同步

 

libp2p封装性很好,提供了一系列模块,如果只是简单的发送区块,通过Gossip可以很容易的做到。 如果新节点启动去同步指定节点,发送请求消息的时候,你发发现这个需要提供的模块很难满足你的要求,你需要实现自己的Behaviour,ProtocolsHandler,UpgradeInfo,InboundUpgrade,OutboundUpgrade等一系列trait。

impl<TSubstream> NetworkBehaviour for P2P<TSubstream>
    where  TSubstream: AsyncRead + AsyncWrite,
{
    type ProtocolsHandler = P2PHandler<TSubstream>;
    type OutEvent = P2PMessage;

fn new_handler(&mut self) -> Self::ProtocolsHandler { P2PHandler::new( SubstreamProtocol::new(P2PProtocol), Duration::from_secs(30), &self.log, ) }

fn inject_connected(&mut self, peer_id: PeerId, connected_point: ConnectedPoint) { self.events.push(NetworkBehaviourAction::GenerateEvent( P2PMessage::InjectConnect(peer_id,connected_point), )); }

fn inject_disconnected(&mut self, peer_id: &PeerId, _: ConnectedPoint) { // inform the p2p handler that the peer has disconnected self.events.push(NetworkBehaviourAction::GenerateEvent( P2PMessage::PeerDisconnected(peer_id.clone()), )); }

fn inject_node_event(&mut self, source: PeerId, event: <Self::ProtocolsHandler as ProtocolsHandler>::OutEvent, ) { // send the event to the user self.events .push(NetworkBehaviourAction::GenerateEvent(P2PMessage::P2P( source, event,))); }

fn poll(&mut self, _: &mut impl PollParameters, ) -> Async< NetworkBehaviourAction< <Self::ProtocolsHandler as ProtocolsHandler>::InEvent, Self::OutEvent,>, > { if !self.events.is_empty() { return Async::Ready(self.events.remove(0)); } Async::NotReady } }

如何实现自定义协议内容有点多,打算放到后面讲解。

 

握手消息

 

当节点建立连接后,节点需要交换本地的创世hash,区块高度,网络ID,判断和对方是否在同一个区块链网络里面。

#[derive(Serialize, Deserialize,Clone, Debug, PartialEq)]
pub struct StatusMessage {
    /// genesis block hash
    pub genesis_hash: Hash,
    /// Latest finalized root.
    pub finalized_root: Hash,
    /// Latest finalized number.
    pub finalized_number: u64,
    /// The latest block root.
    pub head_root: Hash,
    /// The slot associated with the latest block root.
    pub network_id: u16,
}
如果网络id,创世hash都一致,对方的finalized_number比你高,本地需要维护对方的状态信息,然后启动同步。
// add the peer to the head's pool
        self.chains.target_head_slot = remote.finalized_number;
        self.chains.target_head_root = remote.finalized_root;
        self.chains.add_peer(network, peer_id);
        let local = self.chain.read().unwrap().current_block().height();
        self.chains.start_syncing(network, local);

批量区块下载请求

 

由于当新节点加入网络的时候,当前全网出块高度已经很高了,你不可能一个一个的下载,需要一次下载多个区块,这时候每个请求包需要携带起始高度,请求多少个块,多少个块回传一次。

pub struct BlocksByRangeRequest {
    /// The hash tree root of a block on the requested chain.
    pub head_block_root: Hash,

/// The starting slot to request blocks. pub start_slot: u64,

/// The number of blocks from the start slot. pub count: u64,

/// The step increment to receive blocks. /// /// A value of 1 returns every block. /// A value of 2 returns every second block. /// A value of 3 returns every third block and so on. pub step: u64, }

新区块缓存

 

同步的启动策略一般是当本地高度和对方节点上链高度相差一定高度常量,才会启动。 想象一下,握手时对方高度是100,当你同步完这些区块时,对方节点已经到120了,对方发送的区块121,收到后是无法上链的,而同步高度常量是50,只能等到150才能再次启动同步。 解决这个问题需要引入区块缓存。

 

新区块gossip通知

 

每产生一个区块,区块生产者都会通过gossip发送到订阅这个topic的节点,gossip为libp2p实现,内部维护节点管理。

// create the network topic to send on
        let topic = GossipTopic::MapBlock;
        let message = PubsubMessage::Block(bincode::serialize(&data).unwrap());
        self.network_send
            .try_send(NetworkMessage::Publish {
                topics: vec![topic.into()],
                message,
            })
            .unwrap_or_else(|_| warn!(self.log, "Could not send gossip message."));
 

优先级队列

 

将收到的区块缓存到队列里面,按着高度的负数进行插入,这样每次获取队列元素第一个都是最小区块,和本地上链高度比较。

  • 如果高度刚好等于上链高度+1,则插入blockchain
  • 小于则丢弃
  • 大于丢入队列
if !self.queue.is_empty() {
            let (block_low,height_low_negative) = self.queue.peek().unwrap();
            let height_low_ref = *height_low_negative;
            let height_low = (-height_low_ref) as u64;
            if  height_low > current_block.height() + 1 {
                self.queue.push(block_low.clone(),height_low_ref);
            } else if height_low == current_block.height() + 1 {
                let broadcast = match self.chain.write().expect("").insert_block_ref(&block) {
                    Ok(_) => {
                        true
                    }
                    Err(e) => {
                        println!("network insert_block,Error: {:?}", e);
                        false
                    }
                };
                return broadcast
            }
        }
 

异步库

 

网络同步少不了异步线程,需要channel来传递消息。libp2p 0.16以前好像都是使用Tokio io,之后换成了async std。

futures也发生了一些变化。

  • 0.1 版本的 Future 签名中包含了一个 Error 关联类型,而且 poll 总是会返回一个 Result。
  • 0.3 版本里该错误类型已被移除,对于错误需要显式处理。为了保持行为上的一致性,我们需要将代码里所有 Future<Item=Foo, Error=Bar> 替换为Future<Output=Result<Foo, Bar>>(留意 Item 到 Output 的名称变化)
pool()方法返回值也变了
Async::Ready -> Poll::Pending
感兴趣的可以一起学习。

本文链接:https://www.8btc.com/media/614532
转载请注明文章出处

文章标签: Rust
评论
登录 账号发表你的看法,还没有账号?立即免费 注册