Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion drivers/teldrive/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ type chunkTask struct {
fileName string
chunkSize int64
reader io.ReadSeeker
ss stream.StreamSectionReaderIF
ss stream.StreamSectionReader
}

type CopyManager struct {
Expand Down
41 changes: 29 additions & 12 deletions internal/bootstrap/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,24 +99,41 @@ func InitConfig() {
if conf.Conf.MaxConcurrency > 0 {
net.DefaultConcurrencyLimit = &net.ConcurrencyLimit{Limit: conf.Conf.MaxConcurrency}
}
if conf.Conf.MaxBufferLimit < 0 {
m, _ := mem.VirtualMemory()
if m != nil {
conf.MaxBufferLimit = max(int(float64(m.Total)*0.05), 4*utils.MB)
conf.MaxBufferLimit -= conf.MaxBufferLimit % utils.MB

memStat, _ := mem.VirtualMemory()
if memStat != nil {
log.Infof("total memory: %dMB, available: %dMB", memStat.Total>>20, memStat.Available>>20)
}
if conf.Conf.MinFreeMemory < 1 {
if memStat != nil {
t := (memStat.Total >> 20) * 10 / 100
conf.MinFreeMemory = max(16, min(t, 1024)) << 20
} else {
conf.MaxBufferLimit = 16 * utils.MB
conf.MinFreeMemory = 16 * utils.MB
}
} else {
conf.MaxBufferLimit = conf.Conf.MaxBufferLimit * utils.MB
conf.MinFreeMemory = max(16, uint64(conf.Conf.MinFreeMemory)) << 20
}
log.Infof("max buffer limit: %dMB", conf.MaxBufferLimit/utils.MB)
if conf.Conf.MmapThreshold > 0 {
conf.MmapThreshold = conf.Conf.MmapThreshold * utils.MB
log.Infof("min free memory: %dMB", conf.MinFreeMemory>>20)

if conf.Conf.MaxBlockLimit < 0 {
if memStat != nil {
t := (memStat.Total >> 20) * 1 / 100
conf.MaxBlockLimit = max(4, min(uint64(t), 64)) << 20
} else {
conf.MaxBlockLimit = 16 * utils.MB
}
} else {
conf.MaxBlockLimit = uint64(conf.Conf.MaxBlockLimit) << 20
}
log.Infof("max block limit: %dMB", conf.MaxBlockLimit>>20)

if conf.Conf.CacheThreshold > 0 {
conf.CacheThreshold = uint(conf.Conf.CacheThreshold) << 20
} else {
conf.MmapThreshold = 0
conf.CacheThreshold = 0
}
log.Infof("mmap threshold: %dMB", conf.Conf.MmapThreshold)
log.Infof("cache threshold: %dMB", conf.CacheThreshold>>20)

if len(conf.Conf.Log.Filter.Filters) == 0 {
conf.Conf.Log.Filter.Enable = false
Expand Down
9 changes: 5 additions & 4 deletions internal/conf/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,8 +120,9 @@ type Config struct {
DistDir string `json:"dist_dir"`
Log LogConfig `json:"log" envPrefix:"LOG_"`
DelayedStart int `json:"delayed_start" env:"DELAYED_START"`
MaxBufferLimit int `json:"max_buffer_limitMB" env:"MAX_BUFFER_LIMIT_MB"`
MmapThreshold int `json:"mmap_thresholdMB" env:"MMAP_THRESHOLD_MB"`
MinFreeMemory int `json:"min_free_memoryMB" env:"MIN_FREE_MEMORY_MB"`
MaxBlockLimit int `json:"max_block_limitMB" env:"MAX_BLOCK_LIMIT_MB"`
CacheThreshold int `json:"cache_thresholdMB" env:"CACHE_THRESHOLD_MB"`
MaxConnections int `json:"max_connections" env:"MAX_CONNECTIONS"`
MaxConcurrency int `json:"max_concurrency" env:"MAX_CONCURRENCY"`
TlsInsecureSkipVerify bool `json:"tls_insecure_skip_verify" env:"TLS_INSECURE_SKIP_VERIFY"`
Expand Down Expand Up @@ -178,8 +179,8 @@ func DefaultConfig(dataDir string) *Config {
},
},
},
MaxBufferLimit: -1,
MmapThreshold: 4,
MaxBlockLimit: -1,
CacheThreshold: 4,
MaxConnections: 0,
MaxConcurrency: 64,
TlsInsecureSkipVerify: false,
Expand Down
10 changes: 6 additions & 4 deletions internal/conf/var.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,12 @@ var FilenameCharMap = make(map[string]string)
var PrivacyReg []*regexp.Regexp

var (
// 单个Buffer最大限制
MaxBufferLimit = 16 * 1024 * 1024
// 超过该阈值的Buffer将使用 mmap 分配,可主动释放内存
MmapThreshold = 4 * 1024 * 1024
// 单次内存、磁盘缓存的扩容最大限制,超过该阈值将分多次扩充
MaxBlockLimit uint64 = 16 * 1024 * 1024
// 超过该阈值的Buffer将使用HybridCache,可主动释放内存。
CacheThreshold uint = 4 * 1024 * 1024
// 最小空闲内存
MinFreeMemory uint64 = 16 * 1024 * 1024
)
var (
RawIndexHtml string
Expand Down
230 changes: 230 additions & 0 deletions internal/mem/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,230 @@
package mem

import (
"errors"
"io"
"os"

"github.com/OpenListTeam/OpenList/v4/internal/conf"
"github.com/OpenListTeam/OpenList/v4/pkg/buffer"
)

// 优先使用内存,失败后才使用文件。
// 线程不安全
type HybridCache struct {
mem LinearMemory
memOffset uint64
file *os.File
fileOffset uint64
blockSize uint64
}

func (hc *HybridCache) NextBlockWithSize(size uint64) buffer.Block {
if hc.file != nil {
if hc.fileOffset > 0 && hc.file.Truncate(int64(hc.fileOffset+size)) != nil {
return nil
}
base := hc.fileOffset
hc.fileOffset += size
fs := buffer.NewBlockAdapter(
io.NewOffsetWriter(hc.file, int64(base)),
io.NewSectionReader(hc.file, int64(base), int64(size)),
)
return fs
}
all, err := hc.mem.Reallocate(uint64(hc.memOffset + size))
if err == nil {
start := hc.memOffset
hc.memOffset += size
return buffer.NewByteBlock(all[start : start+size])
}
if err := hc.initFileCache(); err != nil {
return nil
}
return hc.NextBlockWithSize(size)
}

func (hc *HybridCache) NextBlock() buffer.Block {
return hc.NextBlockWithSize(hc.blockSize)
}

// func (hc *HybridCache) GetBlockSize() uint64 {
// return hc.blockSize
// }

func (hc *HybridCache) RollbackBlockWithSize(size uint64) {
if hc.fileOffset > size {
hc.fileOffset -= size
return
}
size -= hc.fileOffset
hc.fileOffset = 0
if hc.memOffset > size {
hc.memOffset -= size
return
}
hc.memOffset = 0
}

func (hc *HybridCache) RollbackBlock() {
hc.RollbackBlockWithSize(hc.blockSize)
}

func (hc *HybridCache) initFileCache() error {
f, err := os.CreateTemp(conf.Conf.TempDir, "file-*")
if err != nil {
return err
}
if err := f.Truncate(int64(hc.blockSize)); err != nil {
_, _ = f.Close(), os.Remove(f.Name())
return err
}
hc.file = f
return nil
}

func (hc *HybridCache) Close() error {
if hc.blockSize > 0 {
hc.blockSize = 0
var err error
if hc.mem != nil {
err = hc.mem.Free()
hc.mem = nil
}
if hc.file != nil {
err = errors.Join(err, hc.file.Close(), os.Remove(hc.file.Name()))
hc.file = nil
}
return err
}
return nil
}

func (hc *HybridCache) Size() int64 {
return int64(hc.memOffset + hc.fileOffset)
}

func (hc *HybridCache) ReadAt(p []byte, off int64) (n int, err error) {
if len(p) == 0 {
return 0, nil
}
if off < 0 || off >= hc.Size() {
return 0, io.EOF
}
if off < int64(hc.memOffset) {
all, err := hc.mem.Reallocate(min(hc.memOffset, uint64(off)+uint64(len(p))))
if err != nil {
// 不可能失败
panic(err)
}
n = copy(p, all[off:])
if n == len(p) {
return n, nil
}
p = p[n:]
}

off += int64(n) - int64(hc.memOffset)
canRead := int64(hc.fileOffset) - off
if canRead <= 0 {
return n, io.EOF
}
nn, err := hc.file.ReadAt(p[:min(len(p), int(canRead))], off)
return n + nn, err
}

func (hc *HybridCache) WriteAt(p []byte, off int64) (n int, err error) {
if len(p) == 0 {
return 0, nil
}
if off < 0 || off >= hc.Size() {
return 0, io.ErrShortWrite
}

if off < int64(hc.memOffset) {
all, err := hc.mem.Reallocate(min(hc.memOffset, uint64(off)+uint64(len(p))))
if err != nil {
// 不可能失败
panic(err)
}
n = copy(all[off:], p)
Comment on lines +145 to +150
if n == len(p) {
return n, nil
}
p = p[n:]
}

off += int64(n) - int64(hc.memOffset)
canWrite := int64(hc.fileOffset) - off
if canWrite <= 0 {
return n, io.ErrShortWrite
}
nn, err := hc.file.WriteAt(p[:min(len(p), int(canWrite))], off)
return n + nn, err
}

// 优先使用内存,失败后才使用文件
// 线程不安全
func NewHybridCache(blockSize, maxMemorySize uint64) (*HybridCache, error) {
var err error
if conf.CacheThreshold > 0 {
var m LinearMemory
m, err = NewGuardedMemory(blockSize, maxMemorySize)
if err == nil {
return &HybridCache{mem: m, blockSize: blockSize}, nil
}
}
hc := &HybridCache{blockSize: blockSize}
if err2 := hc.initFileCache(); err2 != nil {
return nil, errors.Join(err, err2)
}
return hc, nil
}

var _ buffer.Block = (*HybridCache)(nil)

type HybridCacheReader struct {
hc *HybridCache
offset int64
}

func NewHybridCacheReader(hc *HybridCache) *HybridCacheReader {
return &HybridCacheReader{hc: hc}
}

func (hcr *HybridCacheReader) Size() int64 {
return hcr.hc.Size()
}

func (hcr *HybridCacheReader) Read(p []byte) (n int, err error) {
n, err = hcr.hc.ReadAt(p, hcr.offset)
if n > 0 {
hcr.offset += int64(n)
}
return n, err
}

func (hcr *HybridCacheReader) ReadAt(p []byte, off int64) (n int, err error) {
return hcr.hc.ReadAt(p, off)
}

func (hcr *HybridCacheReader) Seek(offset int64, whence int) (int64, error) {
switch whence {
case io.SeekStart:
case io.SeekCurrent:
if offset == 0 {
return hcr.offset, nil
}
offset = hcr.offset + offset
case io.SeekEnd:
offset = hcr.Size() + offset
default:
return 0, errors.New("Seek: invalid whence")
}

if offset < 0 || offset > hcr.Size() {
return 0, errors.New("Seek: invalid offset")
}
hcr.offset = offset
return offset, nil
}
25 changes: 25 additions & 0 deletions internal/mem/mem_other.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
//go:build !unix && !windows

package mem // import "github.com/ncruces/go-sqlite3/internal/alloc"
Comment thread
j2rong4cn marked this conversation as resolved.

func NewMemory(cap, max uint64) (LinearMemory, error) {
return &sliceMemory{buf: make([]byte, 0, cap)}, nil
}

type sliceMemory struct {
buf []byte
}

func (b *sliceMemory) Free() error {
b.buf = nil
return nil
}

func (b *sliceMemory) Reallocate(size uint64) ([]byte, error) {
if cap := uint64(cap(b.buf)); size > cap {
b.buf = append(b.buf[:cap], make([]byte, size-cap)...)
} else {
b.buf = b.buf[:size]
}
return b.buf, nil
}
Loading
Loading