8BTCCI: 11407.87 -4.61% 8BTCVI: 6200.55 -6.11% 24H成交额: ¥5601.85亿 +30.74% 总市值: ¥15894.49亿 -4.47%
精通IPFS:IPFS 获取内容之上篇

精通IPFS:IPFS 获取内容之上篇

乔疯 发布在 技术指南 链圈子 68631

前面几篇文章,我们分析了保存文件的过程,我们知道如果一个文件不被任何人访问,那它是保存在本地,至少经过一次访问之后文件才能保存在 IPFS 网络中。今天我们来看下怎么把已经保存在 IPFS 网络中的文件下载到本地,我们以著名的喵星人这个图片为例来分析下怎么下载文件。喵星人这个图片的 hash 为 QmW2WQi7j6c7UgJTarActp7tDNikE4B2qXtFCfLPdsgaTQ ,下载它的示例代码如下:
const {createNode} = require('ipfs')
const fs = require('fs');

const node = createNode()

node.on('ready', async () => {   const file = await node.get('QmW2WQi7j6c7UgJTarActp7tDNikE4B2qXtFCfLPdsgaTQ/cat.jpg') 

  fs.writeFile('cat.jpg',file[0].content,(err) => {

    if (err) throw err;     console.log('cat saved!');   }); })

运行这段代码,就会在当前目录下生成喵星人,让我们先来欣赏下美丽的喵星人吧!

欣赏完喵星人,接下来就看代码是如何执行的。上面代码的主体是 get 方法,这个方法位于 IPFS 的 core/components/files-regular/get.js 文件,它的内容如下:
(ipfsPath, options, callback) => {
    if (typeof options === 'function') {
      callback = options
      options = {}
    }

    options = options || {}

    pull(       self.getPullStream(ipfsPath, options),       pull.asyncMap((file, cb) => {         if (file.content) {           pull(             file.content,             pull.collect((err, buffers) => {               if (err) { return cb(err) }               file.content = Buffer.concat(buffers)               cb(null, file)             })           )         } else {           cb(null, file)         }       }),       pull.collect(callback)     ) }

这个匿名函数接收我们传递给它的图片路径,通过 pull-stream 类库的 pull 函数,调用 IPFS 对象的 getPullStream 方法 获取文件,并在获取到文件之后,调用 pull.asyncMap 流对获取到的文件进行处理,最后把最终的文件传递给 pull.collect 流进行处理,后者直接调用我们提供的回调函数把最终的文件交给用户来处理。

通过上面的简单分析,我们可以发现从 IPFS 网络中获取文件主要是通过 IPFS 对象的 getPullStream 方法,而这个方法是在创建 IPFS 对象的过程中在 core/index.js 文件中被注册为 IPFS 对象的一个方法,它的主要内容就是返回一个 pull-stream 类库的流,代码如下:

pull(
  exporter(ipfsPath, self._ipld, options),
  pull.map(file => {
    file.hash = file.cid.toString()
    delete file.cid
    return file
  })
)
上面的代码定义在 core/components/files-regular/get-pull-stream.js 文件,主体也是 pull-stream 类库的 pull 函数,第一个参数是调用 ipfs-unixfs-exporter 类库函数返回的内部流,这个内部流会从 IPFS 网络中获取我们想要的文件,第二个参数是 pull-stream 类库的 map 流,它根据获取到的文件对象的 CID 生成文件的哈希。

接下来,我们开始看下 exporter 函数,它的代码位于 ipfs-unixfs-exporter 类库的 index.js 文件中。它的执行逻辑如下:

  1. 对传递进来的路径进行处理,返回一个对象。返回的对象中包含了基本路径和中间路径。
    let dPath
    try {
        dPath = pathBaseAndRest(path)
    } catch (err) {
        return error(err)
    }
    
    这里我们传递进来的路径是 QmW2WQi7j6c7UgJTarActp7tDNikE4B2qXtFCfLPdsgaTQ/cat.jpg,所以 dPath 对象的基本路径就是 QmW2WQi7j6c7UgJTarActp7tDNikE4B2qXtFCfLPdsgaTQ,中间路径的数组为空。
  2. 获取不包括最终名称在内的路径长度
    const pathLengthToCut = join([dPath.base].concat(dPath.rest.slice(0, dPath.rest.length - 1))).length
    
  3. 根据基本路径生成 CID 对象。
    const cid = new CID(dPath.base)
    
    CID 对象包括四个部分:multibase、版本号、multicodec、multihash。当我们传递 QmW2WQi7j6c7UgJTarActp7tDNikE4B2qXtFCfLPdsgaTQ 到 CID 构造函数中时,函数内部会设置版本号为 0,multicodec 为 dag-pb,同时调用多重哈希函数把 Base58 字符串转化为 multihash。
  4. 最后,使用 pull 函数返回 pull-stream 类库中使用的流。
    return pull(
        values([{
          cid,
          name: dPath.base,
          path: dPath.base,
          pathRest: dPath.rest,
          depth: 0
        }]),
        createResolver(dag, options),
        filter(Boolean),
        map((node) => {
          return {
            depth: node.depth,
            name: node.name,
            path: options.fullPath ? node.path : finalPathFor(node),
            size: node.size,
            cid: node.cid,
            content: node.content,
            type: node.type
          }
        })
    )
    
    返回的这个流被外面 get-pull-stream.js 文件中的 pull 函数所调用,从而从 IPFS 网络中获取指定的文件。

    在上面的代码简单解释如下:

    • values 函数是 pull-stream 类库中定义的 source 流,它会创建一个从数组或对象读取值,然后终止的源流,这里我们用传递进来的路径及其生成 CID 生成一个对象,并生成一个数组来做为流的内容。
    • createResolver 函数是当前目录下 resolve.js 文件中的 createResolver 函数,它接收 dag 对象,并返回一个 pull-stream 流对象,这里 dag 对象是 IPFS 对象中的 _ipld 对象。这里返回的流对象,从前面一个流中读取要获取的文件,然后调用 dag 对象的 get 方法来获取指定的文件,具体处理下面进行分析。
    • map 函数是 pull-stream 类库中定义的 through 流,它使用用户指定的转换函数对数组中的每个元素进行转换。这里的处理函数比较简单,根据前面流返回的对象,生成并返回另一个对象。这里返回的对象,就是我们最终在示例程序中看到的对象,除了它没有 hash 属性,并且 cid 被删除。
下面,我们重点看下 createResolver 这个函数,它定义于 ipfs-unixfs-exporter 类库的 resolve.js 文件中,它的主体是生成并返回一个 pull-stream 类库中使用 pull 函数,代码具体如下:
pull(
    paramap((item, cb) => {
      if ((typeof item.depth) !== 'number') {
        return error(new Error('no depth'))
      }

      if (item.object) {         return cb(null, resolveItem(null, item.object, item, options))       }

      waterfall([         function (done) {           dag.get(item.cid, done)         },         function (node, done) {            // node 为区块对象反序列化后的结果,可能为文件总的 Dag,也可能为某个文件(在不分块的情况下)。            done(null, resolveItem(item.cid, node.value, item, options))         }       ], cb)     }),     flatten(),     filter(Boolean),     filter((node) => node.depth <= options.maxDepth) )

上面代码简单解释如下:
  1. 首先,调用 paramap 函数,返回 pull-paramap 流,在这个流中使用异步类库的 waterfall 方法,依次调用 IPLD 对象的 get 方法,从本地或其他节点获取区块对象;在得到区块对象之后,调用 resolveItem 方法,处理得到的区块对象(这里得到的区块对象,可能为一个完整的文件,也可能是文件的一个碎片,还可能是一个目录等)。
    pull-paramap 流是一个 pull-stream 流,它接收 3个参数,第一个参数类型为函数,函数签名为 (data, cb),在函数中执行用户自定义的业务逻辑,第二个和第三参数都是可选的。

    pull-paramap 并行地从前面的流中读取数据,调用第一个参数指定函数进行处理,把函数调用结果以数组的形式返回给后面的流。数组中结果的顺序与前面的流提供的源数据顺序保持一致。

    这里 paramap 函数的异步处理函数内容如下:
    • 检查当前对象的 depth 属性是否不是数字,如果不是数字,则返回错误。这里当前对象即为前面 values 流中生成并返回的对象。
    • 如果当前对象的 object 属性存在,则调用 resolveItem 解释当前对象,并把结果传递给下一个函数。我们的对象没有 object 属性,所以这里的代码不会执行。
    • 调用异步类库的 waterfall 方法,依次调用 IPLD 对象的 get 方法,从本地或其他节点获取区块对象;在得到区块对象之后,调用 resolveItem 方法,处理得到的区块对象。
  2. 然后,调用 pull-stream 类库的 flattenfilter 两个 through 流进返回的区块对象进行处理。
  3. 最后,调用 pull-stream 类库的 filter 流,过滤超过指定深度区块对象。
从上面的解释中我们可以发现最主要的的业务逻辑在于获取区块对象和对获取到的区块对象进行处理,下面我们对这两个方面深入分析。

1、获取区块对象

上面的代码,我们是通过调用 dag 对象的 get 方法来获取区块对象,它的执行逻辑如下:
dag 对象的类型是 ipld 类库中的 IPLDResolver 对象,在 IPFS 对象初始化时生成并设置在 IPFS 对象上面。
  1. 如果路径参数类型为函数,则重新设置参数。根据上面调用,我们这里的路径参数是 waterfall 提供的内部函数 done,所以会执行下面的代码重新设置下面两个变量。
    if (typeof path === 'function') {
      callback = path
      path = undefined
    }
    
  2. 如果选项参数为函数,则重新设置参数。根据上面的调用,这里没有选项参数,所以不会执行下面的代码。
    if (typeof options === 'function') {
      callback = options
      options = {}
    }
    
  3. 处理路径参数
    if (typeof path === 'string') {
      path = joinPath('/', path)
        .substr(1)
        .split(osPathSep)
        .join('/')
    }
    
  4. 如果路径参数为空串或没有定义,则调用内部函数 _get 进行处理,并在它的异步回调函数中返回其结果。内部函数 _get 内部通过 waterfall 函数来处理,具体代码如下:
    waterfall([
      (cb) => this._getFormat(cid.codec, cb),
      (format, cb) => this.bs.get(cid, (err, block) => {
        if (err) return cb(err)
        cb(null, format, block)
      }),
      (format, block, cb) => {
        format.util.deserialize(block.data, (err, deserialized) => {
          if (err) {
            return cb(err)
          }
          cb(null, deserialized)
        })
      }
    ], callback)
    
    waterfall 函数内部,首先调用 _getFormat 方法,根据 CID 对象来获取其所对用的格式化对象;然后调用区块服务对象的 get 方法来获得区块对象;最后,使用格式化对象的工具对象的反序列化方法,反序列化区块服务对象获得的区块数据。

    区块服务对象位于 ipfs-block-service 类库的 index.js 文件中,它的 get 方法,根据是否有 bitswap 对象决定是从 bitswap 对象获取区块对象,还是从本地仓库中获取。它的代码如下:

    get (cid, callback) {
        if (this.hasExchange()) {
          this._bitswap.get(cid, callback)
        } else {
          this._repo.blocks.get(cid, callback)
        }
    }
    
    当系统启动过程中,在处理 init-docs 目录内的帮助文档时,bitswap 对象才有空,即只有这个过程才会直接从本地仓库保存/获取区块对象,其他情况都是调用 bitswap 对象的 get 方法来获取区块对象。

    bitswap 对象的 get 方法委托自身的 getMany 方法进行处理。后者的处理过程如下:

    • 初始化内部所用的变量:wantList 数组为空,promptedNetwork 为假,pendingStart 为请求的所有 CID 数量。
    • 生成一个从其他节点获取区块对象的函数对象 getFromOutside
    • 调用异步类库的 map 函数,遍历每一个要请求的区块对象。针对每一个要请求的区块,使用异步类库的 waterfall 函数进行处理。waterfall 函数处理如下:
      • 调用区块存储对象的 has 方法,检查本地是否有请求的区块;
      • 如果本地有请求的区块,则:如果已经处理完所有请求的 CID,那么调用 WantManager 的 wantBlocks 方法获取需要的区块;调用区块存储对象的 get 方法从本地加载区块并返回。
      • 如果内部变量 promptedNetwork 为假,则:设置这个变量为真(保证只有一个请求可以马上处理);调用网络对象的 findAndConnect 方法,查找第一个请求的 CID。
      • 调用函数对象 getFromOutside 进行处理。这个函数把指定的 CID 放入 wantList 数组中,然后调用 notifications 对象的 wantBlock 方法通知系统,我们想要这个区块;如果已经处理完所有请求的 CID,那么调用 WantManager 的 wantBlocks 方法获取需要的区块。
    notifications 对象是一个内部模块,用来跟踪收到的区块、想要的区块、不想要的区块等。这个函数的第一个参数就是请求的区块 CID,第二个参数是一个函数,用来在收到一个区块后,从想要列表中取消这个区块,避免再次请求别的节点,第三个参数用来取消请求某个区块。
  5. 当路径参数不为空串,并且有值时,调用异步类库的 doUntil 函数进行处理。doUntil 函数内部的业务处理与上面基本类似,读者可以自己分析。

2、解析区块对象

当调用 dag 对象的 get 方法获取到区块对象之后,是不是处理流程已经结束了?不是这样的,我们可以想像下,Unix 文件系统是一个树状的结构,从根目录 / 开始,然后是子目录,子目录下面又可以是孙目录或文件,孙目录下面又可以重孙目录或文件,子子孙孙无穷尽也。除了目录之外,当我们要获取的文件如果很大,在上传时候也会被切分类似目录树结构的结构,最顶层为文件总的 DAGNode 对象,它通过 DAGLink 连接到子碎片,子碎片又可以通过 DAGLink 连接到孙碎片,孙碎片又可以通过 DAGLink 连接到重孙碎片,子子孙孙无穷尽也。所以获取到区块对象之后,异步类库的 waterfall 函数通过调用它的第二个参数,从而调用 resolveItem 函数来解析获取到的区块对象,根据获取到的区块来获取完整的区块对象。

resolveItem 函数实现为直接委托给另一个函数 resolve 进行处理,代码如下:

function resolveItem (cid, node, item, options) {
    return resolve({
      cid,
      node,
      name: item.name,
      path: item.path,
      pathRest: item.pathRest,
      dag,
      parentNode: item.parent || parent,
      depth: item.depth,
      options
    })
}
resolve 函数处理过程如下:
  1. 调用函数 typeOf 检测区块对象的真实类型。
    try {
      type = typeOf(node)
    } catch (err) {
      return error(err)
    }
    
    区块对象可能为 directory、hamt-sharded-directory、file、object、raw,每种类型都有特定的处理器进行解析。
  2. resolvers 对象中获取对应类型的解析器
    const nodeResolver = resolvers[type]
    
    resolvers 对象定义在文件开头,内容如下:
    const resolvers = {
      directory: require('./dir-flat'),
      'hamt-sharded-directory': require('./dir-hamt-sharded'),
      file: require('./file'),
      object: require('./object'),
      raw: require('./raw')
    }
    
  3. 调用 createResolver 函数,创建 resolveDeep
  4. 调用 nodeResolver 函数,解析指定的区块对象。对于不同的类型,nodeResolver 函数是不同的。当获取的区块对象类型为目录时,函数为 dir-flat.js 中定义的 dirExporter 函数;当获取的区块对象类型为文件时,函数为 file.js 文件中定义的函数;当获取的区块对象类型为对象时,函数 object.js 文件中定义的函数。当我们获取喵星人时,涉及到两种类型,即目录和文件,下面我们就以这两种类型进行分析。dirExporter 函数执行过程如下:
    • 设置要获取的第一个对象
      const accepts = pathRest[0]
      
      pathRest 是我们在调用 get 方法时,根据提供的路径生成的路径数组,数组中不包括路径的基础部分。对于我们的例子,数组只有一个元素,即 cat.jpg
    • 生成一个代表当前目录的变量。
      const dir = {
          name: name,
          depth: depth,
          path: path,
          cid,
          size: 0,
          type: 'dir'
      }
      
    • 如果当前获取对象的深度超过了选项指定的最大深度,则返回 pull-stream 类库的源流 values
      if (options.maxDepth && options.maxDepth <= depth) {
          return values([dir])
      }
      
    • 生成一个流数组。
      const streams = [
          pull(
            values(node.links),
            filter((item) => accepts === undefined || item.name === accepts),
            map((link) => ({
              depth: depth + 1,
              size: 0,
              name: link.name,
              path: path + '/' + link.name,
              cid: link.cid,
              linkName: link.name,
              pathRest: pathRest.slice(1),
              type: 'dir'
            })),
            resolve
          )
      ]
      
      上面生成的流数组,在函数尾部会通过 pull-cat 类库进行级连调用,上面代码最终执行过程描述如下:遍历当前目录的所有连接;除非掉不属于当前请求的连接;把所有符合条件的连接生成对应的对象;最后,把生成的对象传递给 resolve 流,即调用 createResolver 函数返回的流,这个流我们在前面已经分析过,这里不再细讲。

      在我们获取喵星人的例子当中,我们指定的路径为 QmW2WQi7j6c7UgJTarActp7tDNikE4B2qXtFCfLPdsgaTQ/cat.jpg/ 前面的路径为基本路径,它代表了一个目录,这个目录中有一个文件,这个文件即是喵星人,也即 node.links 指向的是喵星人的 CID。通过 pull-cat 类库的调用,在 createResolver 函数返回的流中我们会真正请求喵星人,具体文件的处理,我们在下面进行分析。

    • 如果路径中除了基本路径之外没有别的路径,或者选项指定为完全路径,则使用 dir 变量生成一个 values 流,并放在流数组 streams 的最前面。
    • 调用 pull-cat 类库进行流处理,上体处理过程见前面描述。
    以上即是 IPFS 处理目录的过程。

    下面我们来看 IPFS 是如何具体文件的,正如我们在前面所提到的,每个文件在保存到 IPFS 网络中都可能进行分片,即把大的文件分成小的碎片,每个碎片有自己的哈希,根据碎片的哈希生成对应的 DAGLink,以碎片在文件中出现的顺序,使用这些 DAGLink 生成连接数组,使用连接数组生成最终的顶层 DAGNode 对象,以此来表示文件。我们的喵星人同样也被分成了两个碎片,在前面分析中,请求目录之后,通过 pull-cat 类库的调用,再次请求 createResolver 函数返回的流的过程中,我们会请求喵星人总的 DAGNode 对象,当调用 nodeResolver 函数时,这次会选择 file.js 文件进行请求处理,它的执行过程如下:

    • 设置要获取的第一个对象
      const accepts = pathRest[0]

      if (accepts !== undefined && accepts !== path) {     return empty() }

      这次 pathRest 数组为空,所以这里 accepts 是未定义。
    • 调用 UnixFS 的静态方法 unmarshal 方法,从区块对象的 data 属性中解组出 Uninx 文件对象。
      try {
          file = UnixFS.unmarshal(node.data)
      } catch (err) {
          return error(err)
      }
      
    • 获取文件大小、指定的长度和偏移量
      const fileSize = file.fileSize()

      let offset = options.offset let length = options.length

      if (offset < 0) {     return error(new Error('Offset must be greater than or equal to 0')) }

      if (offset > fileSize) {     return error(new Error('Offset must be less than the file size')) }

      if (length < 0) {     return error(new Error('Length must be greater than or equal to 0')) }

    • 如果长度为 0,则生成并返回 pull-stream 类库的 once 流。
      if (length === 0) {
          return once({
            depth: depth,
            content: once(Buffer.alloc(0)),
            name: name,
            path: path,
            cid,
            size: fileSize,
            type: 'file'
          })
      }
      
    • 重新计算偏移量和文件长度。
      if (!offset) {
          offset = 0
      }

      if (!length || (offset + length > fileSize)) {     length = fileSize - offset }

    • 调用 streamBytes 函数,根据偏移量、长度及节点的连接数组,获取指定的内容。streamBytes 函数采用深度优先算法获取区块对象的所有碎片数据,它的结果是一个 pull-stream 类库的 through 流。代码如下:
      if (offset === fileSize || length === 0) {
          return once(Buffer.alloc(0))
      }

      const end = offset + length

      return pull(     traverse.depthFirst({       node,       start: 0,       end: fileSize     }, getChildren(dag, offset, end)),     map(extractData(offset, end)),     filter(Boolean) )

      pull-travers 类库中提供了深度优先、广度优先、叶子优先3种算法来遍历一颗树,这里我们使用了深度优先来遍历文件的所有碎片。
    • 生成并返回 pull-stream 类库的 values 流。返回的流在依次被用在 resolver.jscreateResolver 函数返回的流中,后者又被 ipfs-unixfs-exporter 类库中的 pull 函数中的 map 流所使用;ipfs-unixfs-exporter 类库中的 pull 函数中的 map 流又被 get-pull-stream.js 文件中的 pull.map 所使用,并且最终被 get.js 文件中的 pull.asyncMap 流的处理函数转换为 Buffer 对象,从而我们的程序从 Buffer 对象中读取出文件内容。
点击查看《精通IPFS系列》全部文章

评论
登录 账号发表你的看法,还没有账号?立即免费 注册