downloads

package
v1.0.2 Latest Latest
Warning

This package is not in the latest version of its module.

Go to latest
Published: Aug 1, 2024 License: MIT Imports: 32 Imported by: 0

Documentation

Index

Constants

View Source
const (
	// 定时任务,索引清单相关时间参数
	CheckForNewChecklistInitialDelay   = 1 * time.Second  // 初次调用的延迟时间
	CheckForNewChecklistInterval       = 40 * time.Second // 定时循环的常规间隔时间
	CheckForNewChecklistContextTimeout = 60 * time.Minute // 上下文超时时间

	// 定时任务,下载文件片段相关时间参数
	CheckForDownSnippetInterval       = 1 * time.Minute   // 定时循环的常规间隔时间
	CheckForDownSnippetContextTimeout = 180 * time.Minute // 上下文超时时间

	// 定时任务,合并文件相关时间参数
	CheckForMergeFilesInterval       = 10 * time.Second  // 定时循环的常规间隔时间
	CheckForMergeFilesContextTimeout = 200 * time.Minute // 上下文超时时间

	// 通道操作的超时设置
	TickerChannelTimeout = 20 * time.Second // 定时任务通道操作的超时时间
	TickerDownTimeout    = 50 * time.Second // 文件片段下载通道的超时时间

	// 通过令牌桶机制限制同时发送的片段数量,避免通道满载和过多并发请求导致的压力。
	TickerChannelBufferSize     = 20                     // 通道缓冲区大小
	EventDownSnippetChannelSize = 100                    // 通道容量常量
	TokenBucketSize             = 50                     // 令牌桶容量常量
	TokenBucketRefillInterval   = 100 * time.Millisecond // 令牌桶补充间隔
)

Variables

View Source
var (
	// 文件下载请求清单(请求)
	PubSubDownloadChecklistRequestTopic = fmt.Sprintf("defs@pubsub/download/checklist/request/%s", version)

	// 文件下载请求清单(回应)
	PubSubDownloadChecklistResponseTopic = fmt.Sprintf("defs@pubsub/download/checklist/response/%s", version)
)
View Source
var (
	// 文件下载请求清单(回应)
	StreamDownloadChecklistResponseProtocol = fmt.Sprintf("defs@stream/download/checklist/response/%s", version)

	// 文件下载本地
	StreamDownloadLocalProtocol = fmt.Sprintf("defs@stream/download/local/%s", version)

	// 异步发送文件
	StreamAsyncDownloadProtocol = fmt.Sprintf("defs@stream/async/download/%s", version)
)

Functions

func HandleFileDownloadRequestPubSub

func HandleFileDownloadRequestPubSub(opt *opts.Options, afe afero.Afero, p2p *dep2p.DeP2P, pubsub *pubsub.DeP2PPubSub, res *streams.RequestMessage)

HandleFileDownloadRequestPubSub 处理文件下载请求

func HandleFileDownloadResponsePubSub

func HandleFileDownloadResponsePubSub(p2p *dep2p.DeP2P, pubsub *pubsub.DeP2PPubSub, download *DownloadManager, res *streams.RequestMessage)

HandleFileDownloadResponsePubSub 处理文件下载响应

func LoadTasksFromFile

func LoadTasksFromFile(filePath string) (map[string]*DownloadTaskSerializable, error)

LoadTasksFromFile 从文件加载任务 参数:

  • filePath: string 文件路径

返回值:

  • map[string]*DownloadTaskSerializable: 任务映射表
  • error: 如果发生错误,返回错误信息

func RegisterDownloadStreamProtocol

func RegisterDownloadStreamProtocol(input RegisterStreamProtocolInput)

RegisterDownloadStreamProtocol 注册下载流

func RegisterPubsubProtocol

func RegisterPubsubProtocol(lc fx.Lifecycle, input RegisterPubsubProtocolInput)

RegisterPubsubProtocol 注册订阅

func SaveTasksToFile

func SaveTasksToFile(filePath string, tasks map[string]*DownloadTaskSerializable) error

SaveTasksToFile 将任务保存到文件 参数:

  • filePath: string 文件路径
  • tasks: map[string]*DownloadTaskSerializable 任务映射表

返回值:

  • error: 如果发生错误,返回错误信息

Types

type AsyncDownload

type AsyncDownload struct {
	// contains filtered or unexported fields
}

AsyncDownload 表示需要异步下载的文件片段信息

type DownloadChan

type DownloadChan struct {
	TaskID           string  // 任务唯一标识
	TotalPieces      int     // 文件总分片数
	DownloadProgress int     // 下载进度百分比
	IsComplete       bool    // 下载任务是否完成
	SegmentID        string  // 文件片段唯一标识
	SegmentIndex     int     // 文件片段索引,表示该片段在文件中的顺序
	SegmentSize      int     // 文件片段大小,单位为字节
	UsesErasureCodes bool    // 是否使用纠删码技术
	NodeID           peer.ID // 存储该文件片段的节点ID
	DownloadTime     int64   // 下载完成时间的时间戳
}

DownloadChan 用于刷新下载任务的通道

type DownloadFile

type DownloadFile struct {
	FileID      string   // 文件唯一标识
	Name        string   // 文件名,包括扩展名,描述文件的名称
	Size        int64    // 文件大小,单位为字节,描述文件的总大小
	ContentType string   // MIME类型,表示文件的内容类型,如"text/plain"
	Segments    sync.Map // 使用并发安全的 sync.Map 存储文件分片信息,键是分片索引 (int),值是指向 FileSegment 结构体的指针 (*FileSegment)
}

DownloadFile 包含待下载文件的详细信息及其分片信息

func (*DownloadFile) AddSegment

func (df *DownloadFile) AddSegment(index int, segment *FileSegment)

AddSegment 添加文件分片信息 参数:

  • index: int 分片索引
  • segment: *FileSegment 文件分片信息

func (*DownloadFile) AddSegmentNodes

func (df *DownloadFile) AddSegmentNodes(idx int, peers []peer.ID)

AddSegmentNodes 添加文件片段所在的节点信息 参数:

  • idx: int 文件片段的索引。
  • peers: []peer.ID 文件片段所在的节点ID。

func (*DownloadFile) CheckMissingNodes

func (df *DownloadFile) CheckMissingNodes() bool

CheckMissingNodes 检查文件信息中不是纠删码片段的片段,是否存在节点ID是空的 如果有存在节点ID为空的片段,返回true,表示还有切片的节点ID没有拿到,否则返回false 返回值:

  • bool: 是否存在节点ID为空的片段

func (*DownloadFile) DeleteSegment

func (df *DownloadFile) DeleteSegment(index int)

DeleteSegment 删除文件分片信息 参数:

  • index: int 分片索引

func (*DownloadFile) DownloadCompleteCount

func (df *DownloadFile) DownloadCompleteCount() int

DownloadCompleteCount 检查已经完成的数量 返回值:

  • int: 已经完成的文件片段数量

func (*DownloadFile) GetPendingSegmentsForNode

func (df *DownloadFile) GetPendingSegmentsForNode(ID peer.ID) map[int]string

GetPendingSegmentsForNode 获取特定节点下的待下载文件片段索引和唯一标识的映射 参数:

  • ID: peer.ID 节点ID

返回值:

  • map[int]string: 待下载文件片段的索引和唯一标识的映射

func (*DownloadFile) GetSegment

func (df *DownloadFile) GetSegment(index int) (*FileSegment, bool)

GetSegment 获取文件分片信息 参数:

  • index: int 分片索引

返回值:

  • *FileSegment 文件分片信息
  • bool 表示是否找到该索引的分片

func (*DownloadFile) GetSegmentID

func (df *DownloadFile) GetSegmentID(index int) string

GetSegmentID 获取文件片段的唯一标识 参数:

  • index: int 文件片段的索引

返回值:

  • string 文件片段的唯一标识

func (*DownloadFile) GetSegmentsToDownload

func (df *DownloadFile) GetSegmentsToDownload() []int

GetSegmentsToDownload 获取需要下载的文件片段索引 返回值:

  • []int: 需要下载的文件片段索引数组

func (*DownloadFile) IsRsCodes

func (df *DownloadFile) IsRsCodes(index int) bool

IsRsCodes 检查文件片段是否是纠删码片段 参数:

  • index: int 文件片段的索引

返回值:

  • bool 是否是纠删码片段

func (*DownloadFile) IsSegmentCompleted

func (df *DownloadFile) IsSegmentCompleted(opt *opts.Options, afe afero.Afero, index int, subDir string) (bool, bool, error)

IsSegmentCompleted 检查文件片段是否存在以及是否已经下载完成 参数:

  • opt: *opts.Options 文件存储选项配置
  • afe: afero.Afero 文件系统接口
  • index: int 文件片段的索引
  • subDir: string 子目录路径

返回值:

  • bool: 文件片段是否存在
  • bool: 文件片段是否下载完成
  • error: 错误信息

func (*DownloadFile) ListAllSegments

func (df *DownloadFile) ListAllSegments() map[int]*FileSegment

ListAllSegments 列出所有文件分片信息 返回值:

  • map[int]*FileSegment 所有文件分片信息的映射

func (*DownloadFile) SegmentCount

func (df *DownloadFile) SegmentCount() int

SegmentCount 返回当前文件的分片数量 返回值:

  • int: 当前文件的分片数量

func (*DownloadFile) SetNodeInactive

func (df *DownloadFile) SetNodeInactive(ID peer.ID)

SetNodeInactive 将特定节点下的所有文件片段的节点状态设置为不可用 参数:

  • ID: peer.ID 节点ID

func (*DownloadFile) SetSegmentStatus

func (df *DownloadFile) SetSegmentStatus(index int, status SegmentDownloadStatus)

SetSegmentStatus 设置文件片段的下载状态 参数:

  • index: int 文件片段的索引
  • status: SegmentDownloadStatus 文件片段的下载状态

func (*DownloadFile) UpdateSegmentNodes

func (df *DownloadFile) UpdateSegmentNodes(index int, peers []peer.ID)

UpdateSegmentNodes 更新文件分片的节点信息 参数:

  • index: int 分片索引
  • peers: []peer.ID 节点ID列表

func (*DownloadFile) UpdateSegmentStatus

func (df *DownloadFile) UpdateSegmentStatus(index int, status SegmentDownloadStatus)

UpdateSegmentStatus 更新文件分片的下载状态 参数:

  • index: int 分片索引
  • status: SegmentDownloadStatus 新的下载状态

type DownloadManager

type DownloadManager struct {
	Mu              sync.Mutex               // 用于保护状态的互斥锁
	Tasks           map[string]*DownloadTask // 下载任务的映射表
	DownloadChan    chan *DownloadChan       // 下载状态更新通道,用于通知外部下载进度和状态
	SaveTasksToFile chan struct{}            // 保存任务至文件通道
	AsyncDownload   chan *AsyncDownload      // 需要异步下载的文件片段信息
	// contains filtered or unexported fields
}

DownloadManager 管理所有下载任务,提供文件下载的统一入口和管理功能 它负责协调下载任务的执行,管理任务的生命周期,以及通知任务的状态更新和错误事件

func (*DownloadManager) CancelDownload

func (manager *DownloadManager) CancelDownload(
	taskID string,
) error

CancelDownload 取消下载操作

func (*DownloadManager) ChannelEvents

func (manager *DownloadManager) ChannelEvents(opt *opts.Options, afe afero.Afero, p2p *dep2p.DeP2P)

ChannelEvents 通道事件

func (*DownloadManager) ClearDownloadTask

func (manager *DownloadManager) ClearDownloadTask()

ClearDownloadTask 清空文件的下载任务

func (*DownloadManager) ContinueDownload

func (manager *DownloadManager) ContinueDownload(taskID string) error

ContinueDownload 继续下载操作 参数:

  • taskID: string 任务唯一标识,用于区分和管理不同的下载任务

返回值:

  • error 如果发生错误,返回错误信息

func (*DownloadManager) GetDownloadChan

func (manager *DownloadManager) GetDownloadChan() chan *DownloadChan

GetDownloadChan 返回下载状态更新通道

func (*DownloadManager) NewDownload

func (manager *DownloadManager) NewDownload(
	opt *opts.Options,
	afe afero.Afero,
	p2p *dep2p.DeP2P,
	pubsub *pubsub.DeP2PPubSub,
	fileID string,
	ownerPriv *ecdsa.PrivateKey,
	segmentNodes ...map[int][]peer.ID,
) (*DownloadSuccessInfo, error)

NewDownload 新下载操作 参数:

  • opt: *opts.Options 文件存储选项配置。
  • afe: afero.Afero 文件系统接口。
  • p2p: *dep2p.DeP2P 网络主机。
  • pubsub: *pubsub.DeP2PPubSub 网络订阅。
  • fileID: string 文件唯一标识。
  • ownerPriv: *ecdsa.PrivateKey 所有者的私钥。
  • segmentNodes: ...map[int][]peer.ID 文件片段所在节点。

返回值:

  • *DownloadSuccessInfo: 文件下载成功后的返回信息。
  • error: 如果发生错误,返回错误信息。

func (*DownloadManager) PauseDownload

func (manager *DownloadManager) PauseDownload(
	taskID string,
) error

PauseDownload 暂停下载操作

func (*DownloadManager) PeriodicSave

func (manager *DownloadManager) PeriodicSave(filePath string, interval time.Duration)

PeriodicSave 定时保存任务数据到文件 参数:

  • filePath: string 文件路径
  • interval: time.Duration 保存间隔

func (*DownloadManager) ReceivePendingSegments

func (manager *DownloadManager) ReceivePendingSegments(downloadMaximumSize int64, requesterAddress string, taskID, fileID string, segments map[int]string)

ReceivePendingSegments 用于接收未回复的文件片段并将其传递给异步下载通道 参数:

  • requesterAddress string 请求方节点地址
  • pendingSegments map[int]string 剩余的文件片段(索引 -> 文件片段ID)

func (*DownloadManager) RegisterTask

func (manager *DownloadManager) RegisterTask(opt *opts.Options, afe afero.Afero, p2p *dep2p.DeP2P, pubsub *pubsub.DeP2PPubSub, task *DownloadTask)

RegisterTask 向管理器注册一个新的下载任务。 参数:

  • task: *DownloadTask 为准备注册的下载任务。

func (*DownloadManager) SaveTasksToFileSingleChan

func (manager *DownloadManager) SaveTasksToFileSingleChan()

SaveTasksToFileChan 保存任务至文件的通知通道

type DownloadStatus

type DownloadStatus string

下载任务的状态

const (
	StatusPending     DownloadStatus = "pending"     // 待下载
	StatusDownloading DownloadStatus = "downloading" // 下载中
	StatusCompleted   DownloadStatus = "completed"   // 下载完成
	StatusFailed      DownloadStatus = "failed"      // 下载失败
	StatusPaused      DownloadStatus = "paused"      // 下载暂停
)

type DownloadSuccessInfo

type DownloadSuccessInfo struct {
	TaskID       string // 任务唯一标识,用于区分和管理不同的下载任务
	FileID       string // 文件唯一标识,用于在系统内部唯一区分文件
	DownloadTime int64  // 文件开始下载的时间
}

DownloadSuccessInfo 用于封装文件下载成功后的返回信息

type DownloadTask

type DownloadTask struct {
	TaskID       string            // 任务唯一标识
	File         *DownloadFile     // 待下载的文件信息
	TotalPieces  int               // 文件总片数(数据片段和纠删码片段的总数)
	DataPieces   int               // 数据片段的数量
	OwnerPriv    *ecdsa.PrivateKey // 所有者的私钥
	Secret       []byte            // 文件加密密钥
	UserPubHash  []byte            // 用户的公钥哈希
	Progress     util.BitSet       // 下载任务的进度,表示为0到100之间的百分比
	CreatedAt    int64             // 任务创建的时间戳
	UpdatedAt    int64             // 最后一次下载成功的时间戳
	MergeCounter int               // 用于跟踪文件合并操作的计数器

	TickerChecklist   chan struct{} // 定时任务,通知检查是否需要下载新的索引清单的通道
	TickerDownSnippet chan struct{} // 定时任务,通知检查是否需要下载新的文件片段的通道
	TickerMergeFile   chan struct{} // 定时任务,通知检查是否需要执行文件合并操作的通道

	ChecklistDone   chan struct{} // 用于通知索引清单完成,关闭定时任务的通道
	DownSnippetDone chan struct{} // 用于通知下载片段完成,关闭定时任务的通道
	MergeFileDone   chan struct{} // 用于通知文件合并完成,关闭定时任务的通道

	EventChecklist   chan struct{} // 通道事件,通知下载新的索引清单的通道
	EventDownSnippet chan int      // 通道事件,通知下载新的文件片段的通道
	EventMergeFile   chan struct{} // 通道事件,通知执行文件合并操作的通道

	DownloadTaskDone chan struct{} // 用于通知文件下载任务完成的通道

	DownloadStatus DownloadStatus // 下载任务的状态
	StatusCond     *sync.Cond     // 用于状态变化的条件变量

	ChecklistTimeout    bool       `optional:"false" default:"false"` // 索引清单超时,默认为 false,且为必填项
	ChecklistStatusCond *sync.Cond // 用于索引清单状态变化的条件变量

	DownSnippetTimeout    bool       `optional:"false" default:"false"` // 下载片段超时,默认为 false,且为必填项
	DownSnippetStatusCond *sync.Cond // 用于片段下载状态变化的条件变量
	// contains filtered or unexported fields
}

DownloadTask 描述一个文件下载任务

func NewDownloadTask

func NewDownloadTask(ctx context.Context, taskID string, fileID string, ownerPriv *ecdsa.PrivateKey) (*DownloadTask, error)

NewDownloadTask 创建并初始化一个新的DownloadTask实例。 参数:

  • ctx: context.Context 上下文用于管理协程的生命周期。
  • mu: *sync.Mutex 互斥锁用于保护状态。
  • taskID: string 任务的唯一标识符。
  • fileID: string 待下载的文件信息。
  • ownerPriv: *ecdsa.PrivateKey 所有者的私钥。

返回值:

  • *DownloadTask: 新创建的DownloadTask实例。
  • error: 如果发生错误,返回错误信息。

func (*DownloadTask) ChannelEvents

func (task *DownloadTask) ChannelEvents(
	opt *opts.Options,
	afe afero.Afero,
	p2p *dep2p.DeP2P,
	pubsub *pubsub.DeP2PPubSub,
	manager *DownloadManager,
)

ChannelEvents 通道事件处理 参数:

  • opt: *opts.Options 文件存储选项配置,用于指定文件存储相关的选项。
  • afe: afero.Afero 文件系统接口,用于文件操作。
  • p2p: *dep2p.DeP2P DeP2P 网络主机,用于网络操作。
  • pubsub: *pubsub.DeP2PPubSub DeP2P 网络订阅系统,用于发布和订阅消息。
  • manager: *DownloadManager 管理下载任务的管理器,用于管理下载任务的相关操作。

func (*DownloadTask) ChannelEventsEventChecklist

func (task *DownloadTask) ChannelEventsEventChecklist(
	p2p *dep2p.DeP2P,
	pubsub *pubsub.DeP2PPubSub,
)

ChannelEventsEventChecklist 通道事件处理下载新的索引清单 参数:

  • p2p: *dep2p.DeP2P 表示 DeP2P 网络主机。
  • pubsub: *pubsub.DeP2PPubSub 表示 DeP2P 网络订阅系统。

func (*DownloadTask) ChannelEventsEventDownSnippet

func (task *DownloadTask) ChannelEventsEventDownSnippet(
	opt *opts.Options,
	afe afero.Afero,
	p2p *dep2p.DeP2P,
	pubsub *pubsub.DeP2PPubSub,
	downloadMaximumSize int64,
	index int,
	downloadChan chan *DownloadChan,
)

ChannelEventsEventDownSnippet 通道事件处理下载新的文件片段 参数:

  • opt: *opts.Options 文件存储选项配置
  • afe: afero.Afero 文件系统接口
  • p2p: *dep2p.DeP2P 表示 DeP2P 网络主机。
  • pubsub: *pubsub.DeP2PPubSub 表示 DeP2P 网络订阅系统。
  • downloadMaximumSize: int64 下载最大回复大小
  • index: int 需要下载的文件片段索引
  • downloadChan: chan *DownloadChan 下载状态更新通道

func (*DownloadTask) ChannelEventsEventMergeFile

func (task *DownloadTask) ChannelEventsEventMergeFile(opt *opts.Options, afe afero.Afero, p2p *dep2p.DeP2P, downloadChan chan *DownloadChan)

ChannelEventsEventMergeFile 通道事件处理执行文件合并操作 参数:

  • p2p: *dep2p.DeP2P 表示 DeP2P 网络主机。
  • pubsub: *pubsub.DeP2PPubSub 表示 DeP2P 网络订阅系统。

func (*DownloadTask) ChannelEventsTickerChecklist

func (task *DownloadTask) ChannelEventsTickerChecklist()

ChannelEventsTickerChecklist 通道事件处理检查是否需要下载新的索引清单

func (*DownloadTask) ChannelEventsTickerDownSnippet

func (task *DownloadTask) ChannelEventsTickerDownSnippet()

func (*DownloadTask) ChannelEventsTickerMergeFile

func (task *DownloadTask) ChannelEventsTickerMergeFile()

ChannelEventsTickerMergeFile 通道事件处理检查是否需要执行文件合并操作

func (*DownloadTask) CheckDataSegmentsCompleted

func (task *DownloadTask) CheckDataSegmentsCompleted() bool

CheckDataSegmentsCompleted 检查数据片段是否下载完成 参数:无 返回值:

  • bool: 如果数据片段下载完成返回true,否则返回false

func (*DownloadTask) CheckForDownSnippet

func (task *DownloadTask) CheckForDownSnippet()

CheckForDownSnippet 定时任务,检查是否需要下载文件片段。

func (*DownloadTask) CheckForMergeFiles

func (task *DownloadTask) CheckForMergeFiles()

CheckForMergeFiles 定时任务,检查是否需要合并文件。

func (*DownloadTask) CheckForNewChecklist

func (task *DownloadTask) CheckForNewChecklist()

CheckForNewChecklist 定时任务,检查是否需要下载新的索引清单。

func (*DownloadTask) ChecklistDoneSingleChan

func (task *DownloadTask) ChecklistDoneSingleChan()

ChecklistDoneSingleChan 向任务的通知索引清单完成,关闭定时任务的通道发送通知。 如果通道已满,会先丢弃旧消息再写入新消息,确保通道始终保持最新的通知。

func (*DownloadTask) DownSnippetDoneSingleChan

func (task *DownloadTask) DownSnippetDoneSingleChan()

DownSnippetDoneSingleChan 向任务的通知下载片段完成,关闭定时任务的通道发送通知。 如果通道已满,会先丢弃旧消息再写入新消息,确保通道始终保持最新的通知。

func (*DownloadTask) DownloadTaskDoneSingleChan

func (task *DownloadTask) DownloadTaskDoneSingleChan()

DownloadTaskDoneSingleChan 向任务的通知文件下载任务完成的通道发送通知。 如果通道已满,会先丢弃旧消息再写入新消息,确保通道始终保持最新的通知。

func (*DownloadTask) EventChecklistSingleChan

func (task *DownloadTask) EventChecklistSingleChan()

EventChecklistSingleChan 向任务的通知下载新的索引清单的通道发送通知。 如果通道已满,会先丢弃旧消息再写入新消息,确保通道始终保持最新的通知。

func (*DownloadTask) EventDownSnippetChan

func (task *DownloadTask) EventDownSnippetChan(index int)

EventDownSnippetChan 向任务的通知下载新的文件片段的通道写入索引,并处理通道已满的情况。 参数:

  • index: int 文件片段的索引。

func (*DownloadTask) EventMergeFileSingleChan

func (task *DownloadTask) EventMergeFileSingleChan()

EventMergeFileSingleChan 向任务的通知执行文件合并操作的通道发送通知。 如果通道已满,会先丢弃旧消息再写入新消息,确保通道始终保持最新的通知。

func (*DownloadTask) FromSerializable

func (task *DownloadTask) FromSerializable(serializable *DownloadTaskSerializable) error

FromSerializable 从可序列化的结构体恢复 DownloadTask 参数:

  • serializable: *DownloadTaskSerializable 可序列化的 DownloadTask 结构体

func (*DownloadTask) GetChecklistTimeout

func (task *DownloadTask) GetChecklistTimeout() bool

GetChecklistTimeout 获取当前的索引清单状态。

func (*DownloadTask) GetDownSnippetTimeout

func (task *DownloadTask) GetDownSnippetTimeout() bool

GetDownSnippetTimeout 获取当前的片段下载状态。

func (*DownloadTask) GetDownloadStatus

func (task *DownloadTask) GetDownloadStatus() DownloadStatus

GetDownloadStatus 获取当前下载任务的状态

func (*DownloadTask) MergeFileDoneSingleChan

func (task *DownloadTask) MergeFileDoneSingleChan()

MergeFileDoneSingleChan 向任务的通知文件合并完成,关闭定时任务的通道发送通知。 如果通道已满,会先丢弃旧消息再写入新消息,确保通道始终保持最新的通知。

func (*DownloadTask) SetChecklistTimeout

func (task *DownloadTask) SetChecklistTimeout(status bool)

SetChecklistTimeout 设置索引清单状态,并通知所有等待索引清单状态变化的goroutine。

func (*DownloadTask) SetDownSnippetTimeout

func (task *DownloadTask) SetDownSnippetTimeout(status bool)

SetDownSnippetTimeout 设置片段下载状态,并通知所有等待片段下载状态变化的goroutine。

func (*DownloadTask) SetDownloadStatus

func (task *DownloadTask) SetDownloadStatus(status DownloadStatus)

SetDownloadStatus 设置下载任务的状态,并通知所有等待区块同步状态变化的goroutine。

func (*DownloadTask) TickerChecklistSingleChan

func (task *DownloadTask) TickerChecklistSingleChan(parentCtx context.Context)

TickerChecklistSingleChan 向任务的通知检查是否需要下载新的索引清单的通道发送通知。 如果通道已满,会先丢弃旧消息再写入新消息,确保通道始终保持最新的通知。

func (*DownloadTask) TickerDownSnippetSingleChan

func (task *DownloadTask) TickerDownSnippetSingleChan(parentCtx context.Context)

TickerDownSnippetSingleChan 向任务的通知检查是否需要下载新的文件片段的通道发送通知。 如果通道已满,会先丢弃旧消息再写入新消息,确保通道始终保持最新的通知。

func (*DownloadTask) TickerMergeFileSingleChan

func (task *DownloadTask) TickerMergeFileSingleChan(parentCtx context.Context)

TickerMergeFileSingleChan 向任务的通知检查是否需要执行文件合并操作的通道发送通知。 如果通道已满,会先丢弃旧消息再写入新消息,确保通道始终保持最新的通知。

func (*DownloadTask) ToSerializable

func (task *DownloadTask) ToSerializable() (*DownloadTaskSerializable, error)

ToSerializable 将 DownloadTask 转换为可序列化的结构体 返回值:

  • *DownloadTaskSerializable: 可序列化的 DownloadTask 结构体

func (*DownloadTask) UpdateDownloadPieceInfo

func (task *DownloadTask) UpdateDownloadPieceInfo(payload *FileDownloadResponseChecklistPayload, peerID peer.ID)

UpdateDownloadPieceInfo 用于更新下载任务中特定片段的节点信息。 参数:

  • payload: *FileDownloadResponseChecklistPayload 下载响应的有效载荷。
  • peerID: peer.ID 节点的ID。

func (*DownloadTask) WaitFoDownloadStatusChange

func (task *DownloadTask) WaitFoDownloadStatusChange(targetStatuses ...DownloadStatus)

WaitFoDownloadStatusChange 阻塞当前goroutine,直到下载任务的状态发生变化。

func (*DownloadTask) WaitForChecklistTimeoutChange

func (task *DownloadTask) WaitForChecklistTimeoutChange()

WaitForChecklistTimeoutChange 阻塞当前goroutine,直到索引清单状态发生变化。

func (*DownloadTask) WaitForDownSnippetTimeoutChange

func (task *DownloadTask) WaitForDownSnippetTimeoutChange()

WaitForDownSnippetTimeoutChange 阻塞当前goroutine,直到片段下载状态发生变化。

func (*DownloadTask) WaitForSpecificChecklistTimeoutStatus

func (task *DownloadTask) WaitForSpecificChecklistTimeoutStatus(targetStatus bool)

WaitForSpecificChecklistTimeoutStatus 阻塞当前goroutine,直到索引清单状态发生变化并达到期望值。

func (*DownloadTask) WaitForSpecificDownSnippetTimeoutStatus

func (task *DownloadTask) WaitForSpecificDownSnippetTimeoutStatus(targetStatus bool)

WaitForSpecificDownSnippetTimeoutStatus 阻塞当前goroutine,直到片段下载状态发生变化并达到期望值。

type DownloadTaskSerializable

type DownloadTaskSerializable struct {
	TaskID       string         `json:"task_id"`       // 任务唯一标识
	File         *DownloadFile  `json:"file"`          // 待下载的文件信息
	TotalPieces  int            `json:"total_pieces"`  // 文件总片数(数据片段和纠删码片段的总数)
	DataPieces   int            `json:"data_pieces"`   // 数据片段的数量
	OwnerPriv    []byte         `json:"owner_priv"`    // 所有者的私钥
	Secret       []byte         `json:"secret"`        // 文件加密密钥
	UserPubHash  []byte         `json:"user_pub_hash"` // 用户的公钥哈希
	Progress     util.BitSet    `json:"progress"`      // 下载任务的进度,表示为0到100之间的百分比
	CreatedAt    int64          `json:"created_at"`    // 任务创建的时间戳
	UpdatedAt    int64          `json:"updated_at"`    // 最后一次下载成功的时间戳
	MergeCounter int            `json:"merge_counter"` // 用于跟踪文件合并操作的计数器
	Status       DownloadStatus `json:"status"`        // 下载任务的状态
}

DownloadTaskSerializable 是 DownloadTask 的可序列化版本

type DownloadToLocal

type DownloadToLocal struct {
	TaskID string // 任务唯一标识
	Index  int    // 分片索引,表示该片段在文件中的顺序
}

type FileDownloadResponseChecklistPayload

type FileDownloadResponseChecklistPayload struct {
	TaskID          string             // 任务唯一标识
	FileID          string             // 文件唯一标识
	Name            string             // 文件名,包括扩展名,描述文件的名称
	Size            int64              // 文件大小,单位为字节,描述文件的总大小
	ContentType     string             // MIME类型,表示文件的内容类型,如"text/plain"
	Checksum        []byte             // 文件的校验和
	SliceTable      map[int]*HashTable // 文件片段的哈希表,记录每个片段的哈希值,支持纠错和数据完整性验证
	AvailableSlices []int              // 本地存储的文件片段信息
}

文件下载响应(清单)

type FileSegment

type FileSegment struct {
	Index     int                   // 分片索引,表示该片段在文件中的顺序
	SegmentID string                // 文件片段的唯一标识
	Checksum  []byte                // 分片的校验和,用于校验分片数据的完整性和一致性
	IsRsCodes bool                  // 是否是纠删码片段
	Nodes     sync.Map              // 使用并发安全的 sync.Map 存储节点信息,键是节点ID (peer.ID),值是节点是否可用 (bool)
	Status    SegmentDownloadStatus // 下载状态
}

FileSegment 描述一个文件分片的详细信息及其下载状态

func (*FileSegment) AddNode

func (fs *FileSegment) AddNode(nodeID peer.ID, active bool)

AddNode 向 Nodes 中添加节点 参数:

  • nodeID: peer.ID 节点的ID
  • active: bool 节点是否可用

func (*FileSegment) DeleteNode

func (fs *FileSegment) DeleteNode(nodeID peer.ID)

DeleteNode 从 Nodes 中删除节点 参数:

  • nodeID: peer.ID 节点的ID

func (*FileSegment) GetNodeActive

func (fs *FileSegment) GetNodeActive(nodeID peer.ID) (bool, bool)

GetNodeActive 获取节点的可用状态 参数:

  • nodeID: peer.ID 节点的ID

返回值:

  • bool: 节点是否可用
  • bool: 如果节点存在,返回 true;否则返回 false

func (*FileSegment) GetNodes

func (fs *FileSegment) GetNodes() map[peer.ID]bool

GetNodes 返回 Nodes 中所有节点的副本 返回值:

  • map[peer.ID]bool: 所有节点的副本,键是节点ID,值是节点是否可用

func (*FileSegment) GetSegmentID

func (fs *FileSegment) GetSegmentID() string

GetSegmentID 返回文件分片的唯一标识

返回值:

  • string: 文件分片的唯一标识

func (*FileSegment) HasActiveNodes

func (fs *FileSegment) HasActiveNodes() bool

HasActiveNodes 检查是否存在可用节点 返回值:

  • bool: 是否存在可用节点

func (*FileSegment) HasNodes

func (fs *FileSegment) HasNodes() bool

HasNodes 检查节点信息是否存在 返回值:

  • bool: 是否存在节点信息

func (*FileSegment) IsCompleted

func (fs *FileSegment) IsCompleted() bool

IsCompleted 检查文件片段是否已经下载完成 返回值:

  • bool: 文件片段是否下载完成

func (*FileSegment) IsNodeActive

func (fs *FileSegment) IsNodeActive(nodeID peer.ID) bool

IsNodeActive 检查指定节点是否可用 参数:

  • nodeID: peer.ID 节点的ID

返回值:

  • bool: 节点是否可用

func (*FileSegment) IsNodesEmpty

func (fs *FileSegment) IsNodesEmpty() bool

IsNodesEmpty 检查节点信息是否为空 返回值:

  • bool: 节点信息是否为空

func (*FileSegment) IsStatus

func (fs *FileSegment) IsStatus(status SegmentDownloadStatus) bool

IsStatus 检查文件片段的状态 参数:

  • status: SegmentDownloadStatus 文件片段的下载状态

返回值:

  • bool: 如果文件片段的状态为指定状态,返回 true;否则返回 false

func (*FileSegment) NodeExists

func (fs *FileSegment) NodeExists(nodeID peer.ID) bool

NodeExists 检查节点是否存在 参数:

  • nodeID: peer.ID 节点的ID

返回值:

  • bool: 如果节点存在,返回 true;否则返回 false

func (*FileSegment) SetNodeInactive

func (fs *FileSegment) SetNodeInactive(nodeID peer.ID)

SetNodeInactive 将指定节点设置为不可用 参数:

  • nodeID: peer.ID 节点的ID

func (*FileSegment) SetStatus

func (fs *FileSegment) SetStatus(status SegmentDownloadStatus)

SetStatus 设置文件片段的下载状态 参数:

  • status: SegmentDownloadStatus 文件片段的下载状态

func (*FileSegment) UpdateNodes

func (fs *FileSegment) UpdateNodes(peers []peer.ID)

UpdateNodes 更新文件片段的节点信息 参数:

  • peers: []peer.ID 文件片段所在的节点ID。

默认情况下,所有提供的节点ID都将标记为可用。如果节点已存在但标记为不可用,会更新其状态为可用。

type HashTable

type HashTable struct {
	Checksum  []byte // 分片的校验和,用于校验分片数据的完整性和一致性
	IsRsCodes bool   // 标记该分片是否使用了纠删码技术,用于数据的恢复和冗余
}

HashTable 描述分片的校验和是否属于纠删码

type NewDownloadManagerInput

type NewDownloadManagerInput struct {
	fx.In
	LC  fx.Lifecycle
	Ctx context.Context // 全局上下文
	Opt *opts.Options   // 文件存储选项配置
	Afe afero.Afero     // 文件系统接口
	P2P *dep2p.DeP2P    // 网络主机
}

type NewDownloadManagerOutput

type NewDownloadManagerOutput struct {
	fx.Out
	Download *DownloadManager // 管理所有下载会话
}

func NewDownloadManager

func NewDownloadManager(input NewDownloadManagerInput) (out NewDownloadManagerOutput)

NewDownloadManager 创建并初始化一个新的 DownloadManager 实例。 参数:

  • input: NewDownloadManagerInput 用于初始化 DownloadManager 的输入结构体。

返回值:

  • NewDownloadManagerOutput: 包含 DownloadManager 的输出结构体。

type RegisterPubsubProtocolInput

type RegisterPubsubProtocolInput struct {
	fx.In
	Ctx      context.Context     // 全局上下文
	Opt      *opts.Options       // 文件存储选项配置
	Afe      afero.Afero         // 文件系统接口
	P2P      *dep2p.DeP2P        // DeP2P网络主机
	PubSub   *pubsub.DeP2PPubSub // DeP2P网络订阅
	Download *DownloadManager    // 管理所有下载任务
}

type RegisterStreamProtocolInput

type RegisterStreamProtocolInput struct {
	fx.In
	LC       fx.Lifecycle
	Ctx      context.Context     // 全局上下文
	Opt      *opts.Options       // 文件存储选项配置
	Afe      afero.Afero         // 文件系统接口
	P2P      *dep2p.DeP2P        // 网络主机
	PubSub   *pubsub.DeP2PPubSub // 网络订阅
	Download *DownloadManager    // 管理所有下载任务
}

type SegmentDownloadStatus

type SegmentDownloadStatus string

文件片段的下载状态

const (
	SegmentStatusPending     SegmentDownloadStatus = "pending"     // 待下载
	SegmentStatusDownloading SegmentDownloadStatus = "downloading" // 下载中
	SegmentStatusCompleted   SegmentDownloadStatus = "completed"   // 下载完成
	SegmentStatusFailed      SegmentDownloadStatus = "failed"      // 下载失败
)

type SegmentListRequest

type SegmentListRequest struct {
	TaskID       string            // 任务唯一标识
	FileID       string            // 文件唯一标识
	UserPubHash  []byte            // 用户的公钥哈希
	SegmentNodes map[int][]peer.ID // 文件片段所在节点
}

SegmentListRequest 描述请求文件片段清单的参数

type StreamAsyncDownloadRequest

type StreamAsyncDownloadRequest struct {
	TaskID      string         // 任务唯一标识
	FileID      string         // 文件唯一标识,用于在系统内部唯一区分文件
	SegmentInfo map[int][]byte // 文件片段的索引和内容的映射
}

StreamAsyncDownloadRequest 向指定的节点发送文件片段的请求消息

type StreamAsyncDownloadResponse

type StreamAsyncDownloadResponse struct {
	Segments map[int]string // 文件片段的索引和内容的映射
}

func RequestStreamAsyncDownload

func RequestStreamAsyncDownload(p2p *dep2p.DeP2P, receiver peer.ID, taskID, fileID string, segmentInfo map[int][]byte) (*StreamAsyncDownloadResponse, error)

RequestStreamAsyncDownload 向指定的节点发送文件片段

type StreamGetSliceToLocalRequest

type StreamGetSliceToLocalRequest struct {
	DownloadMaximumSize int64          // 下载最大回复大小
	UserPubHash         []byte         // 用户的公钥哈希
	TaskID              string         // 任务唯一标识
	FileID              string         // 文件唯一标识,用于在系统内部唯一区分文件
	PrioritySegment     int            // 优先下载的文件片段索引
	SegmentInfo         map[int]string // 文件片段的索引和唯一标识的映射
}

StreamGetSliceToLocalRequest 发送下载文件片段的任务到网络的请求消息

type StreamGetSliceToLocalResponse

type StreamGetSliceToLocalResponse struct {
	SegmentInfo map[int][]byte // 文件片段的索引和内容的映射
}

StreamGetSliceToLocalResponse 发送下载文件片段的任务到网络的响应消息

func ProcessDownloadRequest

func ProcessDownloadRequest(
	opt *opts.Options,
	afe afero.Afero,
	p2p *dep2p.DeP2P,
	pub *pubsub.DeP2PPubSub,
	download *DownloadManager,
	downloadMaximumSize int64,
	taskID string,
	fileID string,
	prioritySegment int,
	segmentInfo map[int]string,
	sender string,
) (*StreamGetSliceToLocalResponse, error)

ProcessDownloadRequest 处理下载请求 参数:

  • opt: *opts.Options 文件存储选项配置
  • afe: afero.Afero 文件系统接口
  • p2p: *dep2p.DeP2P 网络主机
  • pub: *pubsub.DeP2PPubSub 网络订阅
  • download: *DownloadManager 管理所有下载任务
  • downloadMaximumSize: int64 下载最大回复大小
  • taskID: string 任务唯一标识
  • fileID: string 文件唯一标识
  • prioritySegment: int 优先下载的文件片段索引
  • segmentInfo: map[int]string 文件片段的索引和唯一标识的映射
  • sender: string 请求方节点地址

返回值:

  • *StreamGetSliceToLocalResponse 下载回复结构体
  • error 错误信息

func RequestStreamGetSliceToLocal

func RequestStreamGetSliceToLocal(p2p *dep2p.DeP2P, receiver peer.ID, downloadMaximumSize int64, userPubHash []byte, taskID, fileID string, prioritySegment int, segmentInfo map[int]string) (*StreamGetSliceToLocalResponse, error)

RequestStreamGetSliceToLocal 向指定的节点发送请求以下载文件片段 参数:

  • p2p: *dep2p.DeP2P 表示 DeP2P 网络主机
  • receiver: peer.ID 目标节点的 ID
  • downloadMaximumSize: int64 下载最大回复大小
  • userPubHash: []byte 用户的公钥哈希
  • fileID: string 文件唯一标识
  • prioritySegment: int 优先下载的文件片段索引
  • segmentInfo: map[int]string 文件片段的索引和唯一标识的映射

返回值:

  • *StreamGetSliceToLocalResponse: 下载文件片段的响应消息
  • error: 如果发生错误,返回错误信息

type StreamProtocol

type StreamProtocol struct {
	Ctx      context.Context     // 全局上下文
	Opt      *opts.Options       // 文件存储选项配置
	Afe      afero.Afero         // 文件系统接口
	P2P      *dep2p.DeP2P        // 网络主机
	PubSub   *pubsub.DeP2PPubSub // 网络订阅
	Download *DownloadManager    // 管理所有下载任务
}

流协议

Jump to

Keyboard shortcuts

? : This menu
/ : Search site
f or F : Jump to
y or Y : Canonical URL