Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
063dd21
fix(quark): refactor upPart to use http.NewRequest
j2rong4cn Oct 11, 2025
fcfc2ee
fix(quark): improved upload handling
j2rong4cn Oct 11, 2025
20da567
fix(quark_open): improved upload handling
j2rong4cn Oct 11, 2025
c19203f
fix: add retry context to multiple upload functions
j2rong4cn Oct 11, 2025
794c642
fix: optimize hash calculation in multipart upload to avoid blocking
j2rong4cn Oct 12, 2025
90cc233
fix: update error handling in lifecycle functions for better clarity
j2rong4cn Oct 12, 2025
6c5e4e4
fix: update upload progress calculation to improve accuracy
j2rong4cn Oct 12, 2025
188ebe7
fix: simplify error handling in lifecycle functions for improved read…
j2rong4cn Oct 12, 2025
c5cc6cb
fix: remove unnecessary mutex for part uploads to simplify code
j2rong4cn Oct 12, 2025
41e21c1
fix(stream): simplify file handling in NewStreamSectionReader and imp…
j2rong4cn Oct 8, 2025
3fcf6e6
fix(terabox): optimize chunk count calculation in Put method
j2rong4cn Oct 18, 2025
c64f5a6
Merge remote-tracking branch 'upstream/main' into pr/refactor2
j2rong4cn Oct 23, 2025
b037cab
Merge remote-tracking branch 'upstream/main' into pr/refactor2
j2rong4cn Nov 7, 2025
573ff34
perf(chaoxing): 表单上传文件0拷贝
j2rong4cn Nov 8, 2025
4384498
fix(cnb_releases): improve file upload progress tracking
j2rong4cn Nov 8, 2025
b2ead7e
Merge remote-tracking branch 'upstream/main' into pr/refactor2
j2rong4cn Nov 10, 2025
1158af0
fix(baidu_netdisk): improve upload handling
j2rong4cn Nov 10, 2025
f6a3568
fix(upload): optimize buffer initialization for file uploads
j2rong4cn Nov 10, 2025
f06adf9
.
j2rong4cn Nov 12, 2025
3e4151b
Merge remote-tracking branch 'upstream/main' into pr/refactor2
j2rong4cn Nov 24, 2025
0827180
.
j2rong4cn Nov 24, 2025
b88980f
.
j2rong4cn Nov 24, 2025
1e3d6f0
fix(baidu_netdisk): add retry condition to skip ErrUploadIDExpired in…
j2rong4cn Nov 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions drivers/115_open/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,16 +107,16 @@ func (d *Open115) multpartUpload(ctx context.Context, stream model.FileStreamer,
if err != nil {
return err
}
rateLimitedRd := driver.NewLimitedUploadStream(ctx, rd)
err = retry.Do(func() error {
rd.Seek(0, io.SeekStart)
part, err := bucket.UploadPart(imur, rateLimitedRd, partSize, int(i))
part, err := bucket.UploadPart(imur, driver.NewLimitedUploadStream(ctx, rd), partSize, int(i))
if err != nil {
return err
}
parts[i-1] = part
return nil
},
retry.Context(ctx),
retry.Attempts(3),
retry.DelayType(retry.BackOffDelay),
retry.Delay(time.Second))
Expand Down
23 changes: 7 additions & 16 deletions drivers/123/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,27 +125,18 @@ func (d *Pan123) newUpload(ctx context.Context, upReq *UploadResp, file model.Fi
curSize = lastChunkSize
}
var reader io.ReadSeeker
var rateLimitedRd io.Reader
threadG.GoWithLifecycle(errgroup.Lifecycle{
Before: func(ctx context.Context) error {
if reader == nil {
var err error
reader, err = ss.GetSectionReader(offset, curSize)
if err != nil {
return err
}
rateLimitedRd = driver.NewLimitedUploadStream(ctx, reader)
}
return nil
Before: func(ctx context.Context) (err error) {
reader, err = ss.GetSectionReader(offset, curSize)
return
},
Do: func(ctx context.Context) error {
Do: func(ctx context.Context) (err error) {
reader.Seek(0, io.SeekStart)
uploadUrl := s3PreSignedUrls.Data.PreSignedUrls[strconv.Itoa(cur)]
if uploadUrl == "" {
return fmt.Errorf("upload url is empty, s3PreSignedUrls: %+v", s3PreSignedUrls)
}
reader.Seek(0, io.SeekStart)
req, err := http.NewRequestWithContext(ctx, http.MethodPut, uploadUrl, rateLimitedRd)
req, err := http.NewRequestWithContext(ctx, http.MethodPut, uploadUrl, driver.NewLimitedUploadStream(ctx, reader))
if err != nil {
return err
}
Expand All @@ -157,7 +148,7 @@ func (d *Pan123) newUpload(ctx context.Context, upReq *UploadResp, file model.Fi
}
defer res.Body.Close()
if res.StatusCode == http.StatusForbidden {
singleflight.AnyGroup.Do(fmt.Sprintf("Pan123.newUpload_%p", threadG), func() (any, error) {
_, err, _ = singleflight.AnyGroup.Do(fmt.Sprintf("Pan123.newUpload_%p", threadG), func() (any, error) {
newS3PreSignedUrls, err := getS3UploadUrl(ctx, upReq, cur, end)
if err != nil {
return nil, err
Expand All @@ -177,7 +168,7 @@ func (d *Pan123) newUpload(ctx context.Context, upReq *UploadResp, file model.Fi
}
return fmt.Errorf("upload s3 chunk %d failed, status code: %d, body: %s", cur, res.StatusCode, body)
}
progress := 10.0 + 85.0*float64(threadG.Success())/float64(chunkCount)
progress := 100 * float64(threadG.Success()+1) / float64(chunkCount+1)
up(progress)
return nil
},
Expand Down
32 changes: 14 additions & 18 deletions drivers/123_open/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,25 +73,20 @@ func (d *Open123) Upload(ctx context.Context, file model.FileStreamer, createRes
// 表单
b := bytes.NewBuffer(make([]byte, 0, 2048))
threadG.GoWithLifecycle(errgroup.Lifecycle{
Before: func(ctx context.Context) error {
if reader == nil {
var err error
// 每个分片一个reader
reader, err = ss.GetSectionReader(offset, size)
if err != nil {
return err
}
// 计算当前分片的MD5
Before: func(ctx context.Context) (err error) {
reader, err = ss.GetSectionReader(offset, size)
return
},
Do: func(ctx context.Context) (err error) {
reader.Seek(0, io.SeekStart)
if sliceMD5 == "" {
// 把耗时的计算放在这里,避免阻塞其他协程
sliceMD5, err = utils.HashReader(utils.MD5, reader)
if err != nil {
return err
}
reader.Seek(0, io.SeekStart)
}
return nil
},
Do: func(ctx context.Context) error {
// 重置分片reader位置,因为HashReader、上一次失败已经读取到分片EOF
reader.Seek(0, io.SeekStart)

b.Reset()
w := multipart.NewWriter(b)
Expand Down Expand Up @@ -140,20 +135,21 @@ func (d *Open123) Upload(ctx context.Context, file model.FileStreamer, createRes
if res.StatusCode != 200 {
return fmt.Errorf("slice %d upload failed, status code: %d", partNumber, res.StatusCode)
}
var resp BaseResp
respBody, err := io.ReadAll(res.Body)
b.Reset()
_, err = b.ReadFrom(res.Body)
if err != nil {
return err
}
err = json.Unmarshal(respBody, &resp)
var resp BaseResp
err = json.Unmarshal(b.Bytes(), &resp)
if err != nil {
return err
}
if resp.Code != 0 {
return fmt.Errorf("slice %d upload failed: %s", partNumber, resp.Message)
}

progress := 10.0 + 85.0*float64(threadG.Success())/float64(uploadNums)
progress := 100 * float64(threadG.Success()+1) / float64(uploadNums+1)
up(progress)
return nil
},
Expand Down
42 changes: 19 additions & 23 deletions drivers/189pc/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -756,30 +756,24 @@ func (y *Cloud189PC) StreamUpload(ctx context.Context, dstDir model.Obj, file mo
}
partInfo := ""
var reader io.ReadSeeker
var rateLimitedRd io.Reader
threadG.GoWithLifecycle(errgroup.Lifecycle{
Before: func(ctx context.Context) error {
if reader == nil {
var err error
reader, err = ss.GetSectionReader(offset, partSize)
if err != nil {
return err
}
silceMd5.Reset()
w, err := utils.CopyWithBuffer(writers, reader)
if w != partSize {
return fmt.Errorf("failed to read all data: (expect =%d, actual =%d) %w", partSize, w, err)
}
// 计算块md5并进行hex和base64编码
md5Bytes := silceMd5.Sum(nil)
silceMd5Hexs = append(silceMd5Hexs, strings.ToUpper(hex.EncodeToString(md5Bytes)))
partInfo = fmt.Sprintf("%d-%s", i, base64.StdEncoding.EncodeToString(md5Bytes))

rateLimitedRd = driver.NewLimitedUploadStream(ctx, reader)
Before: func(ctx context.Context) (err error) {
reader, err = ss.GetSectionReader(offset, partSize)
if err != nil {
return err
}
silceMd5.Reset()
w, err := utils.CopyWithBuffer(writers, reader)
if w != partSize {
return fmt.Errorf("failed to read all data: (expect =%d, actual =%d) %w", partSize, w, err)
}
// 计算块md5并进行hex和base64编码
md5Bytes := silceMd5.Sum(nil)
silceMd5Hexs = append(silceMd5Hexs, strings.ToUpper(hex.EncodeToString(md5Bytes)))
partInfo = fmt.Sprintf("%d-%s", i, base64.StdEncoding.EncodeToString(md5Bytes))
return nil
},
Do: func(ctx context.Context) error {
Do: func(ctx context.Context) (err error) {
reader.Seek(0, io.SeekStart)
uploadUrls, err := y.GetMultiUploadUrls(ctx, isFamily, initMultiUpload.Data.UploadFileID, partInfo)
if err != nil {
Expand All @@ -788,11 +782,11 @@ func (y *Cloud189PC) StreamUpload(ctx context.Context, dstDir model.Obj, file mo

// step.4 上传切片
uploadUrl := uploadUrls[0]
_, err = y.put(ctx, uploadUrl.RequestURL, uploadUrl.Headers, false, rateLimitedRd, isFamily)
_, err = y.put(ctx, uploadUrl.RequestURL, uploadUrl.Headers, false, driver.NewLimitedUploadStream(ctx, reader), isFamily)
if err != nil {
return err
}
up(float64(threadG.Success()) * 100 / float64(count))
up(float64(threadG.Success()+1) * 100 / float64(count+1))
return nil
},
After: func(err error) {
Expand All @@ -804,6 +798,7 @@ func (y *Cloud189PC) StreamUpload(ctx context.Context, dstDir model.Obj, file mo
if err = threadG.Wait(); err != nil {
return nil, err
}
defer up(100)

if fileMd5 != nil {
fileMd5Hex = strings.ToUpper(hex.EncodeToString(fileMd5.Sum(nil)))
Expand Down Expand Up @@ -995,7 +990,7 @@ func (y *Cloud189PC) FastUpload(ctx context.Context, dstDir model.Obj, file mode
return err
}

up(float64(threadG.Success()) * 100 / float64(len(uploadUrls)))
up(float64(threadG.Success()+1) * 100 / float64(len(uploadUrls)+1))
uploadProgress.UploadParts[i] = ""
return nil
})
Expand All @@ -1007,6 +1002,7 @@ func (y *Cloud189PC) FastUpload(ctx context.Context, dstDir model.Obj, file mode
}
return nil, err
}
defer up(100)
}

// step.5 提交
Expand Down
4 changes: 2 additions & 2 deletions drivers/aliyundrive_open/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,11 +242,11 @@ func (d *AliyundriveOpen) upload(ctx context.Context, dstDir model.Obj, stream m
if err != nil {
return nil, err
}
rateLimitedRd := driver.NewLimitedUploadStream(ctx, rd)
err = retry.Do(func() error {
rd.Seek(0, io.SeekStart)
return d.uploadPart(ctx, rateLimitedRd, createResp.PartInfoList[i])
return d.uploadPart(ctx, driver.NewLimitedUploadStream(ctx, rd), createResp.PartInfoList[i])
},
retry.Context(ctx),
retry.Attempts(3),
retry.DelayType(retry.BackOffDelay),
retry.Delay(time.Second))
Expand Down
Loading