Documentation
¶
Index ¶
- Constants
- Variables
- func HandleFileDownloadRequestPubSub(opt *opts.Options, afe afero.Afero, p2p *dep2p.DeP2P, ...)
- func HandleFileDownloadResponsePubSub(p2p *dep2p.DeP2P, pubsub *pubsub.DeP2PPubSub, download *DownloadManager, ...)
- func LoadTasksFromFile(filePath string) (map[string]*DownloadTaskSerializable, error)
- func RegisterDownloadStreamProtocol(input RegisterStreamProtocolInput)
- func RegisterPubsubProtocol(lc fx.Lifecycle, input RegisterPubsubProtocolInput)
- func SaveTasksToFile(filePath string, tasks map[string]*DownloadTaskSerializable) error
- type AsyncDownload
- type DownloadChan
- type DownloadFile
- func (df *DownloadFile) AddSegment(index int, segment *FileSegment)
- func (df *DownloadFile) AddSegmentNodes(idx int, peers []peer.ID)
- func (df *DownloadFile) CheckMissingNodes() bool
- func (df *DownloadFile) DeleteSegment(index int)
- func (df *DownloadFile) DownloadCompleteCount() int
- func (df *DownloadFile) GetPendingSegmentsForNode(ID peer.ID) map[int]string
- func (df *DownloadFile) GetSegment(index int) (*FileSegment, bool)
- func (df *DownloadFile) GetSegmentID(index int) string
- func (df *DownloadFile) GetSegmentsToDownload() []int
- func (df *DownloadFile) IsRsCodes(index int) bool
- func (df *DownloadFile) IsSegmentCompleted(opt *opts.Options, afe afero.Afero, index int, subDir string) (bool, bool, error)
- func (df *DownloadFile) ListAllSegments() map[int]*FileSegment
- func (df *DownloadFile) SegmentCount() int
- func (df *DownloadFile) SetNodeInactive(ID peer.ID)
- func (df *DownloadFile) SetSegmentStatus(index int, status SegmentDownloadStatus)
- func (df *DownloadFile) UpdateSegmentNodes(index int, peers []peer.ID)
- func (df *DownloadFile) UpdateSegmentStatus(index int, status SegmentDownloadStatus)
- type DownloadManager
- func (manager *DownloadManager) CancelDownload(taskID string) error
- func (manager *DownloadManager) ChannelEvents(opt *opts.Options, afe afero.Afero, p2p *dep2p.DeP2P)
- func (manager *DownloadManager) ClearDownloadTask()
- func (manager *DownloadManager) ContinueDownload(taskID string) error
- func (manager *DownloadManager) GetDownloadChan() chan *DownloadChan
- func (manager *DownloadManager) NewDownload(opt *opts.Options, afe afero.Afero, p2p *dep2p.DeP2P, ...) (*DownloadSuccessInfo, error)
- func (manager *DownloadManager) PauseDownload(taskID string) error
- func (manager *DownloadManager) PeriodicSave(filePath string, interval time.Duration)
- func (manager *DownloadManager) ReceivePendingSegments(downloadMaximumSize int64, requesterAddress string, taskID, fileID string, ...)
- func (manager *DownloadManager) RegisterTask(opt *opts.Options, afe afero.Afero, p2p *dep2p.DeP2P, ...)
- func (manager *DownloadManager) SaveTasksToFileSingleChan()
- type DownloadStatus
- type DownloadSuccessInfo
- type DownloadTask
- func (task *DownloadTask) ChannelEvents(opt *opts.Options, afe afero.Afero, p2p *dep2p.DeP2P, ...)
- func (task *DownloadTask) ChannelEventsEventChecklist(p2p *dep2p.DeP2P, pubsub *pubsub.DeP2PPubSub)
- func (task *DownloadTask) ChannelEventsEventDownSnippet(opt *opts.Options, afe afero.Afero, p2p *dep2p.DeP2P, ...)
- func (task *DownloadTask) ChannelEventsEventMergeFile(opt *opts.Options, afe afero.Afero, p2p *dep2p.DeP2P, ...)
- func (task *DownloadTask) ChannelEventsTickerChecklist()
- func (task *DownloadTask) ChannelEventsTickerDownSnippet()
- func (task *DownloadTask) ChannelEventsTickerMergeFile()
- func (task *DownloadTask) CheckDataSegmentsCompleted() bool
- func (task *DownloadTask) CheckForDownSnippet()
- func (task *DownloadTask) CheckForMergeFiles()
- func (task *DownloadTask) CheckForNewChecklist()
- func (task *DownloadTask) ChecklistDoneSingleChan()
- func (task *DownloadTask) DownSnippetDoneSingleChan()
- func (task *DownloadTask) DownloadTaskDoneSingleChan()
- func (task *DownloadTask) EventChecklistSingleChan()
- func (task *DownloadTask) EventDownSnippetChan(index int)
- func (task *DownloadTask) EventMergeFileSingleChan()
- func (task *DownloadTask) FromSerializable(serializable *DownloadTaskSerializable) error
- func (task *DownloadTask) GetChecklistTimeout() bool
- func (task *DownloadTask) GetDownSnippetTimeout() bool
- func (task *DownloadTask) GetDownloadStatus() DownloadStatus
- func (task *DownloadTask) MergeFileDoneSingleChan()
- func (task *DownloadTask) SetChecklistTimeout(status bool)
- func (task *DownloadTask) SetDownSnippetTimeout(status bool)
- func (task *DownloadTask) SetDownloadStatus(status DownloadStatus)
- func (task *DownloadTask) TickerChecklistSingleChan(parentCtx context.Context)
- func (task *DownloadTask) TickerDownSnippetSingleChan(parentCtx context.Context)
- func (task *DownloadTask) TickerMergeFileSingleChan(parentCtx context.Context)
- func (task *DownloadTask) ToSerializable() (*DownloadTaskSerializable, error)
- func (task *DownloadTask) UpdateDownloadPieceInfo(payload *FileDownloadResponseChecklistPayload, peerID peer.ID)
- func (task *DownloadTask) WaitFoDownloadStatusChange(targetStatuses ...DownloadStatus)
- func (task *DownloadTask) WaitForChecklistTimeoutChange()
- func (task *DownloadTask) WaitForDownSnippetTimeoutChange()
- func (task *DownloadTask) WaitForSpecificChecklistTimeoutStatus(targetStatus bool)
- func (task *DownloadTask) WaitForSpecificDownSnippetTimeoutStatus(targetStatus bool)
- type DownloadTaskSerializable
- type DownloadToLocal
- type FileDownloadResponseChecklistPayload
- type FileSegment
- func (fs *FileSegment) AddNode(nodeID peer.ID, active bool)
- func (fs *FileSegment) DeleteNode(nodeID peer.ID)
- func (fs *FileSegment) GetNodeActive(nodeID peer.ID) (bool, bool)
- func (fs *FileSegment) GetNodes() map[peer.ID]bool
- func (fs *FileSegment) GetSegmentID() string
- func (fs *FileSegment) HasActiveNodes() bool
- func (fs *FileSegment) HasNodes() bool
- func (fs *FileSegment) IsCompleted() bool
- func (fs *FileSegment) IsNodeActive(nodeID peer.ID) bool
- func (fs *FileSegment) IsNodesEmpty() bool
- func (fs *FileSegment) IsStatus(status SegmentDownloadStatus) bool
- func (fs *FileSegment) NodeExists(nodeID peer.ID) bool
- func (fs *FileSegment) SetNodeInactive(nodeID peer.ID)
- func (fs *FileSegment) SetStatus(status SegmentDownloadStatus)
- func (fs *FileSegment) UpdateNodes(peers []peer.ID)
- type HashTable
- type NewDownloadManagerInput
- type NewDownloadManagerOutput
- type RegisterPubsubProtocolInput
- type RegisterStreamProtocolInput
- type SegmentDownloadStatus
- type SegmentListRequest
- type StreamAsyncDownloadRequest
- type StreamAsyncDownloadResponse
- type StreamGetSliceToLocalRequest
- type StreamGetSliceToLocalResponse
- type StreamProtocol
Constants ¶
const ( // 定时任务,索引清单相关时间参数 CheckForNewChecklistInitialDelay = 1 * time.Second // 初次调用的延迟时间 CheckForNewChecklistInterval = 40 * time.Second // 定时循环的常规间隔时间 CheckForNewChecklistContextTimeout = 60 * time.Minute // 上下文超时时间 // 定时任务,下载文件片段相关时间参数 CheckForDownSnippetInterval = 1 * time.Minute // 定时循环的常规间隔时间 CheckForDownSnippetContextTimeout = 180 * time.Minute // 上下文超时时间 // 定时任务,合并文件相关时间参数 CheckForMergeFilesInterval = 10 * time.Second // 定时循环的常规间隔时间 CheckForMergeFilesContextTimeout = 200 * time.Minute // 上下文超时时间 // 通道操作的超时设置 TickerChannelTimeout = 20 * time.Second // 定时任务通道操作的超时时间 TickerDownTimeout = 50 * time.Second // 文件片段下载通道的超时时间 // 通过令牌桶机制限制同时发送的片段数量,避免通道满载和过多并发请求导致的压力。 TickerChannelBufferSize = 20 // 通道缓冲区大小 EventDownSnippetChannelSize = 100 // 通道容量常量 TokenBucketSize = 50 // 令牌桶容量常量 TokenBucketRefillInterval = 100 * time.Millisecond // 令牌桶补充间隔 )
Variables ¶
var ( // 文件下载请求清单(请求) PubSubDownloadChecklistRequestTopic = fmt.Sprintf("defs@pubsub/download/checklist/request/%s", version) // 文件下载请求清单(回应) PubSubDownloadChecklistResponseTopic = fmt.Sprintf("defs@pubsub/download/checklist/response/%s", version) )
var ( // 文件下载请求清单(回应) StreamDownloadChecklistResponseProtocol = fmt.Sprintf("defs@stream/download/checklist/response/%s", version) // 文件下载本地 StreamDownloadLocalProtocol = fmt.Sprintf("defs@stream/download/local/%s", version) // 异步发送文件 StreamAsyncDownloadProtocol = fmt.Sprintf("defs@stream/async/download/%s", version) )
Functions ¶
func HandleFileDownloadRequestPubSub ¶
func HandleFileDownloadRequestPubSub(opt *opts.Options, afe afero.Afero, p2p *dep2p.DeP2P, pubsub *pubsub.DeP2PPubSub, res *streams.RequestMessage)
HandleFileDownloadRequestPubSub 处理文件下载请求
func HandleFileDownloadResponsePubSub ¶
func HandleFileDownloadResponsePubSub(p2p *dep2p.DeP2P, pubsub *pubsub.DeP2PPubSub, download *DownloadManager, res *streams.RequestMessage)
HandleFileDownloadResponsePubSub 处理文件下载响应
func LoadTasksFromFile ¶
func LoadTasksFromFile(filePath string) (map[string]*DownloadTaskSerializable, error)
LoadTasksFromFile 从文件加载任务 参数:
- filePath: string 文件路径
返回值:
- map[string]*DownloadTaskSerializable: 任务映射表
- error: 如果发生错误,返回错误信息
func RegisterDownloadStreamProtocol ¶
func RegisterDownloadStreamProtocol(input RegisterStreamProtocolInput)
RegisterDownloadStreamProtocol 注册下载流
func RegisterPubsubProtocol ¶
func RegisterPubsubProtocol(lc fx.Lifecycle, input RegisterPubsubProtocolInput)
RegisterPubsubProtocol 注册订阅
func SaveTasksToFile ¶
func SaveTasksToFile(filePath string, tasks map[string]*DownloadTaskSerializable) error
SaveTasksToFile 将任务保存到文件 参数:
- filePath: string 文件路径
- tasks: map[string]*DownloadTaskSerializable 任务映射表
返回值:
- error: 如果发生错误,返回错误信息
Types ¶
type AsyncDownload ¶
type AsyncDownload struct {
// contains filtered or unexported fields
}
AsyncDownload 表示需要异步下载的文件片段信息
type DownloadChan ¶
type DownloadChan struct { TaskID string // 任务唯一标识 TotalPieces int // 文件总分片数 DownloadProgress int // 下载进度百分比 IsComplete bool // 下载任务是否完成 SegmentID string // 文件片段唯一标识 SegmentIndex int // 文件片段索引,表示该片段在文件中的顺序 SegmentSize int // 文件片段大小,单位为字节 UsesErasureCodes bool // 是否使用纠删码技术 NodeID peer.ID // 存储该文件片段的节点ID DownloadTime int64 // 下载完成时间的时间戳 }
DownloadChan 用于刷新下载任务的通道
type DownloadFile ¶
type DownloadFile struct { FileID string // 文件唯一标识 Name string // 文件名,包括扩展名,描述文件的名称 Size int64 // 文件大小,单位为字节,描述文件的总大小 ContentType string // MIME类型,表示文件的内容类型,如"text/plain" Segments sync.Map // 使用并发安全的 sync.Map 存储文件分片信息,键是分片索引 (int),值是指向 FileSegment 结构体的指针 (*FileSegment) }
DownloadFile 包含待下载文件的详细信息及其分片信息
func (*DownloadFile) AddSegment ¶
func (df *DownloadFile) AddSegment(index int, segment *FileSegment)
AddSegment 添加文件分片信息 参数:
- index: int 分片索引
- segment: *FileSegment 文件分片信息
func (*DownloadFile) AddSegmentNodes ¶
func (df *DownloadFile) AddSegmentNodes(idx int, peers []peer.ID)
AddSegmentNodes 添加文件片段所在的节点信息 参数:
- idx: int 文件片段的索引。
- peers: []peer.ID 文件片段所在的节点ID。
func (*DownloadFile) CheckMissingNodes ¶
func (df *DownloadFile) CheckMissingNodes() bool
CheckMissingNodes 检查文件信息中不是纠删码片段的片段,是否存在节点ID是空的 如果有存在节点ID为空的片段,返回true,表示还有切片的节点ID没有拿到,否则返回false 返回值:
- bool: 是否存在节点ID为空的片段
func (*DownloadFile) DeleteSegment ¶
func (df *DownloadFile) DeleteSegment(index int)
DeleteSegment 删除文件分片信息 参数:
- index: int 分片索引
func (*DownloadFile) DownloadCompleteCount ¶
func (df *DownloadFile) DownloadCompleteCount() int
DownloadCompleteCount 检查已经完成的数量 返回值:
- int: 已经完成的文件片段数量
func (*DownloadFile) GetPendingSegmentsForNode ¶
func (df *DownloadFile) GetPendingSegmentsForNode(ID peer.ID) map[int]string
GetPendingSegmentsForNode 获取特定节点下的待下载文件片段索引和唯一标识的映射 参数:
- ID: peer.ID 节点ID
返回值:
- map[int]string: 待下载文件片段的索引和唯一标识的映射
func (*DownloadFile) GetSegment ¶
func (df *DownloadFile) GetSegment(index int) (*FileSegment, bool)
GetSegment 获取文件分片信息 参数:
- index: int 分片索引
返回值:
- *FileSegment 文件分片信息
- bool 表示是否找到该索引的分片
func (*DownloadFile) GetSegmentID ¶
func (df *DownloadFile) GetSegmentID(index int) string
GetSegmentID 获取文件片段的唯一标识 参数:
- index: int 文件片段的索引
返回值:
- string 文件片段的唯一标识
func (*DownloadFile) GetSegmentsToDownload ¶
func (df *DownloadFile) GetSegmentsToDownload() []int
GetSegmentsToDownload 获取需要下载的文件片段索引 返回值:
- []int: 需要下载的文件片段索引数组
func (*DownloadFile) IsRsCodes ¶
func (df *DownloadFile) IsRsCodes(index int) bool
IsRsCodes 检查文件片段是否是纠删码片段 参数:
- index: int 文件片段的索引
返回值:
- bool 是否是纠删码片段
func (*DownloadFile) IsSegmentCompleted ¶
func (df *DownloadFile) IsSegmentCompleted(opt *opts.Options, afe afero.Afero, index int, subDir string) (bool, bool, error)
IsSegmentCompleted 检查文件片段是否存在以及是否已经下载完成 参数:
- opt: *opts.Options 文件存储选项配置
- afe: afero.Afero 文件系统接口
- index: int 文件片段的索引
- subDir: string 子目录路径
返回值:
- bool: 文件片段是否存在
- bool: 文件片段是否下载完成
- error: 错误信息
func (*DownloadFile) ListAllSegments ¶
func (df *DownloadFile) ListAllSegments() map[int]*FileSegment
ListAllSegments 列出所有文件分片信息 返回值:
- map[int]*FileSegment 所有文件分片信息的映射
func (*DownloadFile) SegmentCount ¶
func (df *DownloadFile) SegmentCount() int
SegmentCount 返回当前文件的分片数量 返回值:
- int: 当前文件的分片数量
func (*DownloadFile) SetNodeInactive ¶
func (df *DownloadFile) SetNodeInactive(ID peer.ID)
SetNodeInactive 将特定节点下的所有文件片段的节点状态设置为不可用 参数:
- ID: peer.ID 节点ID
func (*DownloadFile) SetSegmentStatus ¶
func (df *DownloadFile) SetSegmentStatus(index int, status SegmentDownloadStatus)
SetSegmentStatus 设置文件片段的下载状态 参数:
- index: int 文件片段的索引
- status: SegmentDownloadStatus 文件片段的下载状态
func (*DownloadFile) UpdateSegmentNodes ¶
func (df *DownloadFile) UpdateSegmentNodes(index int, peers []peer.ID)
UpdateSegmentNodes 更新文件分片的节点信息 参数:
- index: int 分片索引
- peers: []peer.ID 节点ID列表
func (*DownloadFile) UpdateSegmentStatus ¶
func (df *DownloadFile) UpdateSegmentStatus(index int, status SegmentDownloadStatus)
UpdateSegmentStatus 更新文件分片的下载状态 参数:
- index: int 分片索引
- status: SegmentDownloadStatus 新的下载状态
type DownloadManager ¶
type DownloadManager struct { Mu sync.Mutex // 用于保护状态的互斥锁 Tasks map[string]*DownloadTask // 下载任务的映射表 DownloadChan chan *DownloadChan // 下载状态更新通道,用于通知外部下载进度和状态 SaveTasksToFile chan struct{} // 保存任务至文件通道 AsyncDownload chan *AsyncDownload // 需要异步下载的文件片段信息 // contains filtered or unexported fields }
DownloadManager 管理所有下载任务,提供文件下载的统一入口和管理功能 它负责协调下载任务的执行,管理任务的生命周期,以及通知任务的状态更新和错误事件
func (*DownloadManager) CancelDownload ¶
func (manager *DownloadManager) CancelDownload( taskID string, ) error
CancelDownload 取消下载操作
func (*DownloadManager) ChannelEvents ¶
ChannelEvents 通道事件
func (*DownloadManager) ClearDownloadTask ¶
func (manager *DownloadManager) ClearDownloadTask()
ClearDownloadTask 清空文件的下载任务
func (*DownloadManager) ContinueDownload ¶
func (manager *DownloadManager) ContinueDownload(taskID string) error
ContinueDownload 继续下载操作 参数:
- taskID: string 任务唯一标识,用于区分和管理不同的下载任务
返回值:
- error 如果发生错误,返回错误信息
func (*DownloadManager) GetDownloadChan ¶
func (manager *DownloadManager) GetDownloadChan() chan *DownloadChan
GetDownloadChan 返回下载状态更新通道
func (*DownloadManager) NewDownload ¶
func (manager *DownloadManager) NewDownload( opt *opts.Options, afe afero.Afero, p2p *dep2p.DeP2P, pubsub *pubsub.DeP2PPubSub, fileID string, ownerPriv *ecdsa.PrivateKey, segmentNodes ...map[int][]peer.ID, ) (*DownloadSuccessInfo, error)
NewDownload 新下载操作 参数:
- opt: *opts.Options 文件存储选项配置。
- afe: afero.Afero 文件系统接口。
- p2p: *dep2p.DeP2P 网络主机。
- pubsub: *pubsub.DeP2PPubSub 网络订阅。
- fileID: string 文件唯一标识。
- ownerPriv: *ecdsa.PrivateKey 所有者的私钥。
- segmentNodes: ...map[int][]peer.ID 文件片段所在节点。
返回值:
- *DownloadSuccessInfo: 文件下载成功后的返回信息。
- error: 如果发生错误,返回错误信息。
func (*DownloadManager) PauseDownload ¶
func (manager *DownloadManager) PauseDownload( taskID string, ) error
PauseDownload 暂停下载操作
func (*DownloadManager) PeriodicSave ¶
func (manager *DownloadManager) PeriodicSave(filePath string, interval time.Duration)
PeriodicSave 定时保存任务数据到文件 参数:
- filePath: string 文件路径
- interval: time.Duration 保存间隔
func (*DownloadManager) ReceivePendingSegments ¶
func (manager *DownloadManager) ReceivePendingSegments(downloadMaximumSize int64, requesterAddress string, taskID, fileID string, segments map[int]string)
ReceivePendingSegments 用于接收未回复的文件片段并将其传递给异步下载通道 参数:
- requesterAddress string 请求方节点地址
- pendingSegments map[int]string 剩余的文件片段(索引 -> 文件片段ID)
func (*DownloadManager) RegisterTask ¶
func (manager *DownloadManager) RegisterTask(opt *opts.Options, afe afero.Afero, p2p *dep2p.DeP2P, pubsub *pubsub.DeP2PPubSub, task *DownloadTask)
RegisterTask 向管理器注册一个新的下载任务。 参数:
- task: *DownloadTask 为准备注册的下载任务。
func (*DownloadManager) SaveTasksToFileSingleChan ¶
func (manager *DownloadManager) SaveTasksToFileSingleChan()
SaveTasksToFileChan 保存任务至文件的通知通道
type DownloadStatus ¶
type DownloadStatus string
下载任务的状态
const ( StatusPending DownloadStatus = "pending" // 待下载 StatusDownloading DownloadStatus = "downloading" // 下载中 StatusCompleted DownloadStatus = "completed" // 下载完成 StatusFailed DownloadStatus = "failed" // 下载失败 StatusPaused DownloadStatus = "paused" // 下载暂停 )
type DownloadSuccessInfo ¶
type DownloadSuccessInfo struct { TaskID string // 任务唯一标识,用于区分和管理不同的下载任务 FileID string // 文件唯一标识,用于在系统内部唯一区分文件 DownloadTime int64 // 文件开始下载的时间 }
DownloadSuccessInfo 用于封装文件下载成功后的返回信息
type DownloadTask ¶
type DownloadTask struct { TaskID string // 任务唯一标识 File *DownloadFile // 待下载的文件信息 TotalPieces int // 文件总片数(数据片段和纠删码片段的总数) DataPieces int // 数据片段的数量 OwnerPriv *ecdsa.PrivateKey // 所有者的私钥 Secret []byte // 文件加密密钥 UserPubHash []byte // 用户的公钥哈希 Progress util.BitSet // 下载任务的进度,表示为0到100之间的百分比 CreatedAt int64 // 任务创建的时间戳 UpdatedAt int64 // 最后一次下载成功的时间戳 MergeCounter int // 用于跟踪文件合并操作的计数器 TickerChecklist chan struct{} // 定时任务,通知检查是否需要下载新的索引清单的通道 TickerDownSnippet chan struct{} // 定时任务,通知检查是否需要下载新的文件片段的通道 TickerMergeFile chan struct{} // 定时任务,通知检查是否需要执行文件合并操作的通道 ChecklistDone chan struct{} // 用于通知索引清单完成,关闭定时任务的通道 DownSnippetDone chan struct{} // 用于通知下载片段完成,关闭定时任务的通道 MergeFileDone chan struct{} // 用于通知文件合并完成,关闭定时任务的通道 EventChecklist chan struct{} // 通道事件,通知下载新的索引清单的通道 EventDownSnippet chan int // 通道事件,通知下载新的文件片段的通道 EventMergeFile chan struct{} // 通道事件,通知执行文件合并操作的通道 DownloadTaskDone chan struct{} // 用于通知文件下载任务完成的通道 DownloadStatus DownloadStatus // 下载任务的状态 StatusCond *sync.Cond // 用于状态变化的条件变量 ChecklistTimeout bool `optional:"false" default:"false"` // 索引清单超时,默认为 false,且为必填项 ChecklistStatusCond *sync.Cond // 用于索引清单状态变化的条件变量 DownSnippetTimeout bool `optional:"false" default:"false"` // 下载片段超时,默认为 false,且为必填项 DownSnippetStatusCond *sync.Cond // 用于片段下载状态变化的条件变量 // contains filtered or unexported fields }
DownloadTask 描述一个文件下载任务
func NewDownloadTask ¶
func NewDownloadTask(ctx context.Context, taskID string, fileID string, ownerPriv *ecdsa.PrivateKey) (*DownloadTask, error)
NewDownloadTask 创建并初始化一个新的DownloadTask实例。 参数:
- ctx: context.Context 上下文用于管理协程的生命周期。
- mu: *sync.Mutex 互斥锁用于保护状态。
- taskID: string 任务的唯一标识符。
- fileID: string 待下载的文件信息。
- ownerPriv: *ecdsa.PrivateKey 所有者的私钥。
返回值:
- *DownloadTask: 新创建的DownloadTask实例。
- error: 如果发生错误,返回错误信息。
func (*DownloadTask) ChannelEvents ¶
func (task *DownloadTask) ChannelEvents( opt *opts.Options, afe afero.Afero, p2p *dep2p.DeP2P, pubsub *pubsub.DeP2PPubSub, manager *DownloadManager, )
ChannelEvents 通道事件处理 参数:
- opt: *opts.Options 文件存储选项配置,用于指定文件存储相关的选项。
- afe: afero.Afero 文件系统接口,用于文件操作。
- p2p: *dep2p.DeP2P DeP2P 网络主机,用于网络操作。
- pubsub: *pubsub.DeP2PPubSub DeP2P 网络订阅系统,用于发布和订阅消息。
- manager: *DownloadManager 管理下载任务的管理器,用于管理下载任务的相关操作。
func (*DownloadTask) ChannelEventsEventChecklist ¶
func (task *DownloadTask) ChannelEventsEventChecklist( p2p *dep2p.DeP2P, pubsub *pubsub.DeP2PPubSub, )
ChannelEventsEventChecklist 通道事件处理下载新的索引清单 参数:
- p2p: *dep2p.DeP2P 表示 DeP2P 网络主机。
- pubsub: *pubsub.DeP2PPubSub 表示 DeP2P 网络订阅系统。
func (*DownloadTask) ChannelEventsEventDownSnippet ¶
func (task *DownloadTask) ChannelEventsEventDownSnippet( opt *opts.Options, afe afero.Afero, p2p *dep2p.DeP2P, pubsub *pubsub.DeP2PPubSub, downloadMaximumSize int64, index int, downloadChan chan *DownloadChan, )
ChannelEventsEventDownSnippet 通道事件处理下载新的文件片段 参数:
- opt: *opts.Options 文件存储选项配置
- afe: afero.Afero 文件系统接口
- p2p: *dep2p.DeP2P 表示 DeP2P 网络主机。
- pubsub: *pubsub.DeP2PPubSub 表示 DeP2P 网络订阅系统。
- downloadMaximumSize: int64 下载最大回复大小
- index: int 需要下载的文件片段索引
- downloadChan: chan *DownloadChan 下载状态更新通道
func (*DownloadTask) ChannelEventsEventMergeFile ¶
func (task *DownloadTask) ChannelEventsEventMergeFile(opt *opts.Options, afe afero.Afero, p2p *dep2p.DeP2P, downloadChan chan *DownloadChan)
ChannelEventsEventMergeFile 通道事件处理执行文件合并操作 参数:
- p2p: *dep2p.DeP2P 表示 DeP2P 网络主机。
- pubsub: *pubsub.DeP2PPubSub 表示 DeP2P 网络订阅系统。
func (*DownloadTask) ChannelEventsTickerChecklist ¶
func (task *DownloadTask) ChannelEventsTickerChecklist()
ChannelEventsTickerChecklist 通道事件处理检查是否需要下载新的索引清单
func (*DownloadTask) ChannelEventsTickerDownSnippet ¶
func (task *DownloadTask) ChannelEventsTickerDownSnippet()
func (*DownloadTask) ChannelEventsTickerMergeFile ¶
func (task *DownloadTask) ChannelEventsTickerMergeFile()
ChannelEventsTickerMergeFile 通道事件处理检查是否需要执行文件合并操作
func (*DownloadTask) CheckDataSegmentsCompleted ¶
func (task *DownloadTask) CheckDataSegmentsCompleted() bool
CheckDataSegmentsCompleted 检查数据片段是否下载完成 参数:无 返回值:
- bool: 如果数据片段下载完成返回true,否则返回false
func (*DownloadTask) CheckForDownSnippet ¶
func (task *DownloadTask) CheckForDownSnippet()
CheckForDownSnippet 定时任务,检查是否需要下载文件片段。
func (*DownloadTask) CheckForMergeFiles ¶
func (task *DownloadTask) CheckForMergeFiles()
CheckForMergeFiles 定时任务,检查是否需要合并文件。
func (*DownloadTask) CheckForNewChecklist ¶
func (task *DownloadTask) CheckForNewChecklist()
CheckForNewChecklist 定时任务,检查是否需要下载新的索引清单。
func (*DownloadTask) ChecklistDoneSingleChan ¶
func (task *DownloadTask) ChecklistDoneSingleChan()
ChecklistDoneSingleChan 向任务的通知索引清单完成,关闭定时任务的通道发送通知。 如果通道已满,会先丢弃旧消息再写入新消息,确保通道始终保持最新的通知。
func (*DownloadTask) DownSnippetDoneSingleChan ¶
func (task *DownloadTask) DownSnippetDoneSingleChan()
DownSnippetDoneSingleChan 向任务的通知下载片段完成,关闭定时任务的通道发送通知。 如果通道已满,会先丢弃旧消息再写入新消息,确保通道始终保持最新的通知。
func (*DownloadTask) DownloadTaskDoneSingleChan ¶
func (task *DownloadTask) DownloadTaskDoneSingleChan()
DownloadTaskDoneSingleChan 向任务的通知文件下载任务完成的通道发送通知。 如果通道已满,会先丢弃旧消息再写入新消息,确保通道始终保持最新的通知。
func (*DownloadTask) EventChecklistSingleChan ¶
func (task *DownloadTask) EventChecklistSingleChan()
EventChecklistSingleChan 向任务的通知下载新的索引清单的通道发送通知。 如果通道已满,会先丢弃旧消息再写入新消息,确保通道始终保持最新的通知。
func (*DownloadTask) EventDownSnippetChan ¶
func (task *DownloadTask) EventDownSnippetChan(index int)
EventDownSnippetChan 向任务的通知下载新的文件片段的通道写入索引,并处理通道已满的情况。 参数:
- index: int 文件片段的索引。
func (*DownloadTask) EventMergeFileSingleChan ¶
func (task *DownloadTask) EventMergeFileSingleChan()
EventMergeFileSingleChan 向任务的通知执行文件合并操作的通道发送通知。 如果通道已满,会先丢弃旧消息再写入新消息,确保通道始终保持最新的通知。
func (*DownloadTask) FromSerializable ¶
func (task *DownloadTask) FromSerializable(serializable *DownloadTaskSerializable) error
FromSerializable 从可序列化的结构体恢复 DownloadTask 参数:
- serializable: *DownloadTaskSerializable 可序列化的 DownloadTask 结构体
func (*DownloadTask) GetChecklistTimeout ¶
func (task *DownloadTask) GetChecklistTimeout() bool
GetChecklistTimeout 获取当前的索引清单状态。
func (*DownloadTask) GetDownSnippetTimeout ¶
func (task *DownloadTask) GetDownSnippetTimeout() bool
GetDownSnippetTimeout 获取当前的片段下载状态。
func (*DownloadTask) GetDownloadStatus ¶
func (task *DownloadTask) GetDownloadStatus() DownloadStatus
GetDownloadStatus 获取当前下载任务的状态
func (*DownloadTask) MergeFileDoneSingleChan ¶
func (task *DownloadTask) MergeFileDoneSingleChan()
MergeFileDoneSingleChan 向任务的通知文件合并完成,关闭定时任务的通道发送通知。 如果通道已满,会先丢弃旧消息再写入新消息,确保通道始终保持最新的通知。
func (*DownloadTask) SetChecklistTimeout ¶
func (task *DownloadTask) SetChecklistTimeout(status bool)
SetChecklistTimeout 设置索引清单状态,并通知所有等待索引清单状态变化的goroutine。
func (*DownloadTask) SetDownSnippetTimeout ¶
func (task *DownloadTask) SetDownSnippetTimeout(status bool)
SetDownSnippetTimeout 设置片段下载状态,并通知所有等待片段下载状态变化的goroutine。
func (*DownloadTask) SetDownloadStatus ¶
func (task *DownloadTask) SetDownloadStatus(status DownloadStatus)
SetDownloadStatus 设置下载任务的状态,并通知所有等待区块同步状态变化的goroutine。
func (*DownloadTask) TickerChecklistSingleChan ¶
func (task *DownloadTask) TickerChecklistSingleChan(parentCtx context.Context)
TickerChecklistSingleChan 向任务的通知检查是否需要下载新的索引清单的通道发送通知。 如果通道已满,会先丢弃旧消息再写入新消息,确保通道始终保持最新的通知。
func (*DownloadTask) TickerDownSnippetSingleChan ¶
func (task *DownloadTask) TickerDownSnippetSingleChan(parentCtx context.Context)
TickerDownSnippetSingleChan 向任务的通知检查是否需要下载新的文件片段的通道发送通知。 如果通道已满,会先丢弃旧消息再写入新消息,确保通道始终保持最新的通知。
func (*DownloadTask) TickerMergeFileSingleChan ¶
func (task *DownloadTask) TickerMergeFileSingleChan(parentCtx context.Context)
TickerMergeFileSingleChan 向任务的通知检查是否需要执行文件合并操作的通道发送通知。 如果通道已满,会先丢弃旧消息再写入新消息,确保通道始终保持最新的通知。
func (*DownloadTask) ToSerializable ¶
func (task *DownloadTask) ToSerializable() (*DownloadTaskSerializable, error)
ToSerializable 将 DownloadTask 转换为可序列化的结构体 返回值:
- *DownloadTaskSerializable: 可序列化的 DownloadTask 结构体
func (*DownloadTask) UpdateDownloadPieceInfo ¶
func (task *DownloadTask) UpdateDownloadPieceInfo(payload *FileDownloadResponseChecklistPayload, peerID peer.ID)
UpdateDownloadPieceInfo 用于更新下载任务中特定片段的节点信息。 参数:
- payload: *FileDownloadResponseChecklistPayload 下载响应的有效载荷。
- peerID: peer.ID 节点的ID。
func (*DownloadTask) WaitFoDownloadStatusChange ¶
func (task *DownloadTask) WaitFoDownloadStatusChange(targetStatuses ...DownloadStatus)
WaitFoDownloadStatusChange 阻塞当前goroutine,直到下载任务的状态发生变化。
func (*DownloadTask) WaitForChecklistTimeoutChange ¶
func (task *DownloadTask) WaitForChecklistTimeoutChange()
WaitForChecklistTimeoutChange 阻塞当前goroutine,直到索引清单状态发生变化。
func (*DownloadTask) WaitForDownSnippetTimeoutChange ¶
func (task *DownloadTask) WaitForDownSnippetTimeoutChange()
WaitForDownSnippetTimeoutChange 阻塞当前goroutine,直到片段下载状态发生变化。
func (*DownloadTask) WaitForSpecificChecklistTimeoutStatus ¶
func (task *DownloadTask) WaitForSpecificChecklistTimeoutStatus(targetStatus bool)
WaitForSpecificChecklistTimeoutStatus 阻塞当前goroutine,直到索引清单状态发生变化并达到期望值。
func (*DownloadTask) WaitForSpecificDownSnippetTimeoutStatus ¶
func (task *DownloadTask) WaitForSpecificDownSnippetTimeoutStatus(targetStatus bool)
WaitForSpecificDownSnippetTimeoutStatus 阻塞当前goroutine,直到片段下载状态发生变化并达到期望值。
type DownloadTaskSerializable ¶
type DownloadTaskSerializable struct { TaskID string `json:"task_id"` // 任务唯一标识 File *DownloadFile `json:"file"` // 待下载的文件信息 TotalPieces int `json:"total_pieces"` // 文件总片数(数据片段和纠删码片段的总数) DataPieces int `json:"data_pieces"` // 数据片段的数量 OwnerPriv []byte `json:"owner_priv"` // 所有者的私钥 Secret []byte `json:"secret"` // 文件加密密钥 UserPubHash []byte `json:"user_pub_hash"` // 用户的公钥哈希 Progress util.BitSet `json:"progress"` // 下载任务的进度,表示为0到100之间的百分比 CreatedAt int64 `json:"created_at"` // 任务创建的时间戳 UpdatedAt int64 `json:"updated_at"` // 最后一次下载成功的时间戳 MergeCounter int `json:"merge_counter"` // 用于跟踪文件合并操作的计数器 Status DownloadStatus `json:"status"` // 下载任务的状态 }
DownloadTaskSerializable 是 DownloadTask 的可序列化版本
type DownloadToLocal ¶
type FileDownloadResponseChecklistPayload ¶
type FileDownloadResponseChecklistPayload struct { TaskID string // 任务唯一标识 FileID string // 文件唯一标识 Name string // 文件名,包括扩展名,描述文件的名称 Size int64 // 文件大小,单位为字节,描述文件的总大小 ContentType string // MIME类型,表示文件的内容类型,如"text/plain" Checksum []byte // 文件的校验和 SliceTable map[int]*HashTable // 文件片段的哈希表,记录每个片段的哈希值,支持纠错和数据完整性验证 AvailableSlices []int // 本地存储的文件片段信息 }
文件下载响应(清单)
type FileSegment ¶
type FileSegment struct { Index int // 分片索引,表示该片段在文件中的顺序 SegmentID string // 文件片段的唯一标识 Checksum []byte // 分片的校验和,用于校验分片数据的完整性和一致性 IsRsCodes bool // 是否是纠删码片段 Nodes sync.Map // 使用并发安全的 sync.Map 存储节点信息,键是节点ID (peer.ID),值是节点是否可用 (bool) Status SegmentDownloadStatus // 下载状态 }
FileSegment 描述一个文件分片的详细信息及其下载状态
func (*FileSegment) AddNode ¶
func (fs *FileSegment) AddNode(nodeID peer.ID, active bool)
AddNode 向 Nodes 中添加节点 参数:
- nodeID: peer.ID 节点的ID
- active: bool 节点是否可用
func (*FileSegment) DeleteNode ¶
func (fs *FileSegment) DeleteNode(nodeID peer.ID)
DeleteNode 从 Nodes 中删除节点 参数:
- nodeID: peer.ID 节点的ID
func (*FileSegment) GetNodeActive ¶
func (fs *FileSegment) GetNodeActive(nodeID peer.ID) (bool, bool)
GetNodeActive 获取节点的可用状态 参数:
- nodeID: peer.ID 节点的ID
返回值:
- bool: 节点是否可用
- bool: 如果节点存在,返回 true;否则返回 false
func (*FileSegment) GetNodes ¶
func (fs *FileSegment) GetNodes() map[peer.ID]bool
GetNodes 返回 Nodes 中所有节点的副本 返回值:
- map[peer.ID]bool: 所有节点的副本,键是节点ID,值是节点是否可用
func (*FileSegment) GetSegmentID ¶
func (fs *FileSegment) GetSegmentID() string
GetSegmentID 返回文件分片的唯一标识
返回值:
- string: 文件分片的唯一标识
func (*FileSegment) HasActiveNodes ¶
func (fs *FileSegment) HasActiveNodes() bool
HasActiveNodes 检查是否存在可用节点 返回值:
- bool: 是否存在可用节点
func (*FileSegment) HasNodes ¶
func (fs *FileSegment) HasNodes() bool
HasNodes 检查节点信息是否存在 返回值:
- bool: 是否存在节点信息
func (*FileSegment) IsCompleted ¶
func (fs *FileSegment) IsCompleted() bool
IsCompleted 检查文件片段是否已经下载完成 返回值:
- bool: 文件片段是否下载完成
func (*FileSegment) IsNodeActive ¶
func (fs *FileSegment) IsNodeActive(nodeID peer.ID) bool
IsNodeActive 检查指定节点是否可用 参数:
- nodeID: peer.ID 节点的ID
返回值:
- bool: 节点是否可用
func (*FileSegment) IsNodesEmpty ¶
func (fs *FileSegment) IsNodesEmpty() bool
IsNodesEmpty 检查节点信息是否为空 返回值:
- bool: 节点信息是否为空
func (*FileSegment) IsStatus ¶
func (fs *FileSegment) IsStatus(status SegmentDownloadStatus) bool
IsStatus 检查文件片段的状态 参数:
- status: SegmentDownloadStatus 文件片段的下载状态
返回值:
- bool: 如果文件片段的状态为指定状态,返回 true;否则返回 false
func (*FileSegment) NodeExists ¶
func (fs *FileSegment) NodeExists(nodeID peer.ID) bool
NodeExists 检查节点是否存在 参数:
- nodeID: peer.ID 节点的ID
返回值:
- bool: 如果节点存在,返回 true;否则返回 false
func (*FileSegment) SetNodeInactive ¶
func (fs *FileSegment) SetNodeInactive(nodeID peer.ID)
SetNodeInactive 将指定节点设置为不可用 参数:
- nodeID: peer.ID 节点的ID
func (*FileSegment) SetStatus ¶
func (fs *FileSegment) SetStatus(status SegmentDownloadStatus)
SetStatus 设置文件片段的下载状态 参数:
- status: SegmentDownloadStatus 文件片段的下载状态
func (*FileSegment) UpdateNodes ¶
func (fs *FileSegment) UpdateNodes(peers []peer.ID)
UpdateNodes 更新文件片段的节点信息 参数:
- peers: []peer.ID 文件片段所在的节点ID。
默认情况下,所有提供的节点ID都将标记为可用。如果节点已存在但标记为不可用,会更新其状态为可用。
type HashTable ¶
type HashTable struct { Checksum []byte // 分片的校验和,用于校验分片数据的完整性和一致性 IsRsCodes bool // 标记该分片是否使用了纠删码技术,用于数据的恢复和冗余 }
HashTable 描述分片的校验和是否属于纠删码
type NewDownloadManagerInput ¶
type NewDownloadManagerOutput ¶
type NewDownloadManagerOutput struct { fx.Out Download *DownloadManager // 管理所有下载会话 }
func NewDownloadManager ¶
func NewDownloadManager(input NewDownloadManagerInput) (out NewDownloadManagerOutput)
NewDownloadManager 创建并初始化一个新的 DownloadManager 实例。 参数:
- input: NewDownloadManagerInput 用于初始化 DownloadManager 的输入结构体。
返回值:
- NewDownloadManagerOutput: 包含 DownloadManager 的输出结构体。
type SegmentDownloadStatus ¶
type SegmentDownloadStatus string
文件片段的下载状态
const ( SegmentStatusPending SegmentDownloadStatus = "pending" // 待下载 SegmentStatusDownloading SegmentDownloadStatus = "downloading" // 下载中 SegmentStatusCompleted SegmentDownloadStatus = "completed" // 下载完成 SegmentStatusFailed SegmentDownloadStatus = "failed" // 下载失败 )
type SegmentListRequest ¶
type SegmentListRequest struct { TaskID string // 任务唯一标识 FileID string // 文件唯一标识 UserPubHash []byte // 用户的公钥哈希 SegmentNodes map[int][]peer.ID // 文件片段所在节点 }
SegmentListRequest 描述请求文件片段清单的参数
type StreamAsyncDownloadRequest ¶
type StreamAsyncDownloadRequest struct { TaskID string // 任务唯一标识 FileID string // 文件唯一标识,用于在系统内部唯一区分文件 SegmentInfo map[int][]byte // 文件片段的索引和内容的映射 }
StreamAsyncDownloadRequest 向指定的节点发送文件片段的请求消息
type StreamGetSliceToLocalRequest ¶
type StreamGetSliceToLocalRequest struct { DownloadMaximumSize int64 // 下载最大回复大小 UserPubHash []byte // 用户的公钥哈希 TaskID string // 任务唯一标识 FileID string // 文件唯一标识,用于在系统内部唯一区分文件 PrioritySegment int // 优先下载的文件片段索引 SegmentInfo map[int]string // 文件片段的索引和唯一标识的映射 }
StreamGetSliceToLocalRequest 发送下载文件片段的任务到网络的请求消息
type StreamGetSliceToLocalResponse ¶
StreamGetSliceToLocalResponse 发送下载文件片段的任务到网络的响应消息
func ProcessDownloadRequest ¶
func ProcessDownloadRequest( opt *opts.Options, afe afero.Afero, p2p *dep2p.DeP2P, pub *pubsub.DeP2PPubSub, download *DownloadManager, downloadMaximumSize int64, taskID string, fileID string, prioritySegment int, segmentInfo map[int]string, sender string, ) (*StreamGetSliceToLocalResponse, error)
ProcessDownloadRequest 处理下载请求 参数:
- opt: *opts.Options 文件存储选项配置
- afe: afero.Afero 文件系统接口
- p2p: *dep2p.DeP2P 网络主机
- pub: *pubsub.DeP2PPubSub 网络订阅
- download: *DownloadManager 管理所有下载任务
- downloadMaximumSize: int64 下载最大回复大小
- taskID: string 任务唯一标识
- fileID: string 文件唯一标识
- prioritySegment: int 优先下载的文件片段索引
- segmentInfo: map[int]string 文件片段的索引和唯一标识的映射
- sender: string 请求方节点地址
返回值:
- *StreamGetSliceToLocalResponse 下载回复结构体
- error 错误信息
func RequestStreamGetSliceToLocal ¶
func RequestStreamGetSliceToLocal(p2p *dep2p.DeP2P, receiver peer.ID, downloadMaximumSize int64, userPubHash []byte, taskID, fileID string, prioritySegment int, segmentInfo map[int]string) (*StreamGetSliceToLocalResponse, error)
RequestStreamGetSliceToLocal 向指定的节点发送请求以下载文件片段 参数:
- p2p: *dep2p.DeP2P 表示 DeP2P 网络主机
- receiver: peer.ID 目标节点的 ID
- downloadMaximumSize: int64 下载最大回复大小
- userPubHash: []byte 用户的公钥哈希
- fileID: string 文件唯一标识
- prioritySegment: int 优先下载的文件片段索引
- segmentInfo: map[int]string 文件片段的索引和唯一标识的映射
返回值:
- *StreamGetSliceToLocalResponse: 下载文件片段的响应消息
- error: 如果发生错误,返回错误信息
type StreamProtocol ¶
type StreamProtocol struct { Ctx context.Context // 全局上下文 Opt *opts.Options // 文件存储选项配置 Afe afero.Afero // 文件系统接口 P2P *dep2p.DeP2P // 网络主机 PubSub *pubsub.DeP2PPubSub // 网络订阅 Download *DownloadManager // 管理所有下载任务 }
流协议