8BTCCI: 13788.60 +0.59% 8BTCVI: 5967.29 +0.12% 24H成交额: ¥3180.16亿 -11.13% 总市值: ¥19104.03亿 +0.58%
精通IPFS:IPFS 获取内容之下篇

精通IPFS:IPFS 获取内容之下篇

乔疯 发布在 技术指南 链圈子 35796

在上篇文章《IPFS 获取内容之上篇》中,我们提到调用 streamBytes 函数,根据偏移量、长度及节点的连接数组,获取指定的内容。在获取文件过程中,通过调用 streamBytes 函数来获取整个文件的内容。streamBytes 函数调用完成后返回的 pull 函数生成的 through 流就是我们要读取内容的流,它最终被传递给 IPFS core/components/get-pull-stream.js 中返回的 pull 函数生成的 through 流,而这个流又被同目录下 get.js 文件中 pull-stream 类库中的 asyncMap 流的处理函数转换为完整的缓冲区,从而被最终的应用的所使用,这段程序代码如下:
pull(
  self.getPullStream(ipfsPath, options),
  pull.asyncMap((file, cb) => {
    if (file.content) {
      pull(
        file.content,
        pull.collect((err, buffers) => {
          if (err) { return cb(err) }
          file.content = Buffer.concat(buffers)
          cb(null, file)
        })
      )
    } else {
      cb(null, file)
    }
  }),
  pull.collect(callback)
)
上篇文章的内容,我们回忆到这里就结束了,下面我们仔细研究 streamBytes 函数及相关的深度遍历是如何实现的。

streamBytes 函数使用了 pull-traverse 类库提供深度优先、广度优先、叶子优先等算法,它的每个算法都返回一个 pull 类库的 through 流,这个流被它后面的流所调用。在这里使用深度优先算法,返回的流被 pull 类库的 map 流所调用,用于获取每一个元素。

深度优先算法的相关代码如下:

var once = exports.once =
function (value) {
  return function (abort, cb) {
    if(abort) return cb(abort)
    if(value != null) {
      var _value = value; value = null
      cb(null, _value)
    } else
      cb(true)
  }
}

var depthFirst = exports.depthFirst = function (start, createStream) {   var reads = [], ended

  reads.unshift(once(start))

  return function next (end, cb) {     if(!reads.length)       return cb(true)     if(ended)       return cb(ended)

    reads[0](end, function (end, data) {       if(end) {         if(end !== true) {           ended = end           reads.shift()

          while(reads.length)             reads.shift()(end, function () {})

          return cb(end)         }

        reads.shift()         return next(null, cb)       }

      reads.unshift(createStream(data))       cb(end, data)     })   } }

streamBytes 函数定义在 file.js 文件中,我们来看下它的内容:
function streamBytes (dag, node, fileSize, offset, length) {
  if (offset === fileSize || length === 0) {
    return once(Buffer.alloc(0))
  }

  const end = offset + length

  return pull(     traverse.depthFirst({       node,       start: 0,       end: fileSize     }, getChildren(dag, offset, end)),     map(extractData(offset, end)),     filter(Boolean)   ) }

根据深度优先算法代码我们可知道,它首先把第一个参数包装成一个 pull 类库的 once 流,在这里即把我们的根 DAG 节点包装成一个 once 流,然后作为内部数组的第一个元素,最后返回 pull 类库的 through 流。
我们把返回类似 function next (end, cb) {} 签名的函数称为 pull 类库的 through 流,这个函数被称为读函数。因为它会被后面的流所调用,用来从流中读取数据,当读取数据之后,这个函数通过参数中指定的回调函数把数据传递给后面的流,即传递给调用自己的流。
当 pull 类库的 map 函数返回的 through 流调用深度遍历函数所返回的读取函数时,该读取函数执行如下:
  1. 如果内部数组中没有数据可以读取,那么调用 map 函数返回的 through 流的读取函数,并返回。
    if(!reads.length)
      return cb(true)
    
  2. 如果 ended 变量为真,那么以这个变量为参数调用 map 函数返回的 through 流的读取函数,并返回。
    if(ended)
      return cb(ended)
    
  3. 最后,调用内部数组的第一个元素(类型为 pull 类库的 through 流的读取函数)来读取数据。当读取到数据之后,调用自定义的内部函数来处理数据,在这个内部函数中处理如下:
    • 如果当前读取完成,即 end 为真时,执行下面逻辑。如果 end 不是严格真(出现在变量为字符串 true 情况),那么:设置变量 endedend 的值;删除数组中的第一个元素;如果数组长度不为0,那么持续删除第一个元素(类型为函数),并且调用这个删除的元素;当数组长度为空时,调用回调函数进行处理。否则,即 end 严格为真,从数组中删除第一个元素,因为这意味着数组的当前元素的已经处理完成,所以需要调用外层函数继续从数组中读取数据。

      if(end) { if(end !== true) { ended = end reads.shift()

        while(reads.length)
          reads.shift()(end, function () {})

        return cb(end) }

      reads.shift() return next(null, cb)

      }
    • 调用 createStream 函数来处理读取到的数据,这个 createStream 函数即是我们提供给深度遍历算法的第二个参数 getChildren 函数返回的内部函数。getChildren 函数返回的内部函数最终会返回一个 pull 函数生成的 through 流。在这个流中通过 pull-stream 类库的 flatten 流会把获取到的每个节点及其内部节点最终转换成一个数组形式,比如把
      [1, 2, 3],
      [4, 5, 6],
      [7, 8]
      
      这样的形式转化成下面的形式
      [1, 2, 3, 4, 5, 6, 7, 8]
      
      这里 [1, 2, 3] 可以认为是第一个 Link 碎片,它下面又有三个包含最终数据的 DAG 节点;[4, 5, 6] 是第二个 Link 碎片,它下面也有三个包含最终数据的 DAG 节点; [7, 8] 是第三个 Link 碎片,它下面只有二个包含最终数据的 DAG 节点。

      我们可以发现,通过深度遍历优先算法及其处理函数 getChildren 返回的内部函数流,我们会分别获取每个碎片及其保存的子碎片,并且把它们以正确的顺序排列在一起形成数组,从而最终获取到了 DAG 节点的完整数据。

getChildren 函数返回的内部函数处理方法如下:
  1. 如果当前节点对象是一个缓冲区对象,即当前节点是一个叶子节点,那么直接返回一个空的流,因为没有办法再次进行遍历。
    if (Buffer.isBuffer(node)) {
      return empty()
    }
    
  2. 调用静态方法把当前节点对象转换成一个文件对象。
    let file

    try {   file = UnixFS.unmarshal(node.data) } catch (err) {   return error(err) }

  3. 判断流的开始位置。
    const nodeHasData = Boolean(file.data && file.data.length)
    if (nodeHasData && node.links.length) {
      streamPosition += file.data.length
    }
    
  4. 处理当前节点包含的 Link 信息,并过滤掉不在指定范围内的 Link 信息,然后按顺序返回 Link 信息数组。
    const filteredLinks = node.links
      .map((link, index) => {
        const child = {
          link: link,
          start: streamPosition,
          end: streamPosition + file.blockSizes[index],
          size: file.blockSizes[index]
        }
    streamPosition = child.end
    return child
    
      })
      .filter((child) => {
        return (offset >= child.start && offset < child.end) || // child has offset byte
          (end > child.start && end <= child.end) || // child has end byte
          (offset < child.start && end > child.end) // child is between offset and end bytes
      })
    
  5. 如果最终返回的 Link 信息数组存在,则设置流的起始位置为第一个 Link 信息的开头位置。
    if (filteredLinks.length) {
      streamPosition = filteredLinks[0].start
    }
    
  6. 返回一个 pull 函数构成的流。
    return pull(
      once(filteredLinks),
      paramap((children, cb) => {
        dag.getMany(children.map(child => child.link.cid), (err, results) => {
          if (err) {
            return cb(err)
          }
      cb(null, results.map((result, index) =&gt; {
        const child = children[index]

        return {       start: child.start,       end: child.end,       node: result,       size: child.size     }   })) })

      }),
      flatten()
    )
    
    在这个流中,paramap 函数返回的流会调用 once 函数返回的一次性流,once 函数返回的一次性流会把 Link 信息数组传递给前者。而前者的处理函数会对 Link 信息数组中的每个碎片进行处理(这里只有一个 Link 信息数组,即只有 children 数组,而不是多个 children 数组,但是 children 数组包含了所有 Link 信息)。在 paramap 函数返回的流的处理函数中调用 IPLD 对象的 getMany 获取每个 Link 节点的数据,并对返回的数据进行整理,然后以整理后的数组为参数,调用下一个流---即 flatten 流的读取函数中指定的---回调函数,把最终的数组传递给它。最终,数组被 flatten 流进行扁平化处理后,传递给外部的 pull 函数中的流,即前面所看到的 pull 类库的 map 流的 read 函数中指定的那个函数,在这个函数中又会调用我们提供的 extractData 函数返回的内部函数来处理每一个节点对象。
extractData 函数返回的内部函数比较简单,主要是对获取到的每个碎片数据进行处理,然后返回对应的数组,它的代码如下,读者可自行分析,这里不再细讲。
function getData ({ node, start, end }) {
    let block

    if (Buffer.isBuffer(node)) {       block = node     } else {       try {         const file = UnixFS.unmarshal(node.data)

        if (!file.data) {           if (file.blockSizes.length) {             return           }

          return Buffer.alloc(0)         }

        block = file.data       } catch (err) {         throw new Error(`Failed to unmarshal node - ${err.message}`)       }     }

    if (block && block.length) {       if (streamPosition === -1) {         streamPosition = start       }

      const output = extractDataFromBlock(block, streamPosition, requestedStart, requestedEnd)

      streamPosition += block.length

      return output     }

    return Buffer.alloc(0)   }

文章标签: IPFS 技术指南
评论
登录 账号发表你的看法,还没有账号?立即免费 注册