feat: http readseeker set content total leng

pull/254/head
zijiren233 9 months ago
parent 4058e6f36e
commit 11a66c7f7b

@ -20,6 +20,7 @@ import (
// Cache defines the interface for cache implementations
type Cache interface {
Get(key string) (*CacheItem, bool, error)
GetAnyWithPrefix(prefix string) (*CacheItem, bool, error)
Set(key string, data *CacheItem) error
}
@ -136,10 +137,24 @@ func (i *CacheItem) ReadFrom(r io.Reader) (int64, error) {
return read, nil
}
// TrieNode represents a node in the prefix tree
type TrieNode struct {
children map[rune]*TrieNode
key string
isEnd bool
}
func NewTrieNode() *TrieNode {
return &TrieNode{
children: make(map[rune]*TrieNode),
}
}
// MemoryCache implements an in-memory Cache with LRU eviction
type MemoryCache struct {
m map[string]*dllist.Element[*cacheEntry]
lruList *dllist.Dllist[*cacheEntry]
prefixTrie *TrieNode
capacity int
maxSizeBytes int64
currentSize int64
@ -162,9 +177,10 @@ type cacheEntry struct {
func NewMemoryCache(capacity int, opts ...MemoryCacheOption) *MemoryCache {
mc := &MemoryCache{
m: make(map[string]*dllist.Element[*cacheEntry]),
lruList: dllist.New[*cacheEntry](),
capacity: capacity,
m: make(map[string]*dllist.Element[*cacheEntry]),
lruList: dllist.New[*cacheEntry](),
capacity: capacity,
prefixTrie: NewTrieNode(),
}
for _, opt := range opts {
opt(mc)
@ -194,6 +210,47 @@ func (c *MemoryCache) Get(key string) (*CacheItem, bool, error) {
return item, true, nil
}
func (c *MemoryCache) GetAnyWithPrefix(prefix string) (*CacheItem, bool, error) {
if prefix == "" {
return nil, false, fmt.Errorf("prefix cannot be empty")
}
c.mu.RLock()
defer c.mu.RUnlock()
// Find matching key in prefix tree
node := c.prefixTrie
for _, ch := range prefix {
if next, ok := node.children[ch]; ok {
node = next
} else {
return nil, false, nil
}
}
// DFS to find first complete key
var findKey func(*TrieNode) string
findKey = func(n *TrieNode) string {
if n.isEnd {
return n.key
}
for _, child := range n.children {
if key := findKey(child); key != "" {
return key
}
}
return ""
}
if key := findKey(node); key != "" {
if element, ok := c.m[key]; ok {
return element.Value.item, true, nil
}
}
return nil, false, nil
}
func (c *MemoryCache) Set(key string, data *CacheItem) error {
if key == "" {
return fmt.Errorf("cache key cannot be empty")
@ -234,6 +291,14 @@ func (c *MemoryCache) Set(key string, data *CacheItem) error {
c.currentSize -= entry.size
delete(c.m, entry.key)
c.lruList.Remove(back)
// Remove from prefix tree
node := c.prefixTrie
for _, ch := range entry.key {
node = node.children[ch]
}
node.isEnd = false
node.key = ""
}
}
@ -242,11 +307,26 @@ func (c *MemoryCache) Set(key string, data *CacheItem) error {
element := c.lruList.PushFront(newEntry)
c.m[key] = element
c.currentSize += newSize
// Add to prefix tree
node := c.prefixTrie
for _, ch := range key {
if next, ok := node.children[ch]; ok {
node = next
} else {
node.children[ch] = NewTrieNode()
node = node.children[ch]
}
}
node.isEnd = true
node.key = key
return nil
}
type FileCache struct {
mu *ksync.Krwmutex
memCache *MemoryCache
filePath string
maxSizeBytes int64
currentSize atomic.Int64
@ -276,6 +356,8 @@ func NewFileCache(filePath string, opts ...FileCacheOption) *FileCache {
filePath: filePath,
mu: ksync.DefaultKrwmutex(),
maxAge: 24 * time.Hour, // Default 1 day
// Initialize memory cache with 1000 items capacity
memCache: NewMemoryCache(1000, WithMaxSizeBytes(100*1024*1024)), // 100MB memory cache
}
for _, opt := range opts {
@ -395,6 +477,11 @@ func (c *FileCache) Get(key string) (*CacheItem, bool, error) {
return nil, false, fmt.Errorf("cache key cannot be empty")
}
// Try memory cache first
if item, found, err := c.memCache.Get(key); err == nil && found {
return item, true, nil
}
prefix := string(key[0])
filePath := filepath.Join(c.filePath, prefix, key)
@ -423,9 +510,50 @@ func (c *FileCache) Get(key string) (*CacheItem, bool, error) {
return nil, false, fmt.Errorf("failed to read cache item: %w", err)
}
// Store in memory cache
c.memCache.Set(key, item)
return item, true, nil
}
func (c *FileCache) GetAnyWithPrefix(prefix string) (*CacheItem, bool, error) {
if prefix == "" {
return nil, false, fmt.Errorf("prefix cannot be empty")
}
// Try memory cache first
if item, found, err := c.memCache.GetAnyWithPrefix(prefix); err == nil && found {
return item, true, nil
}
prefixDir := string(prefix[0])
dirPath := filepath.Join(c.filePath, prefixDir)
entries, err := os.ReadDir(dirPath)
if err != nil {
if os.IsNotExist(err) {
return nil, false, nil
}
return nil, false, fmt.Errorf("failed to read directory: %w", err)
}
for _, entry := range entries {
if entry.IsDir() {
continue
}
name := entry.Name()
if len(name) >= len(prefix) && name[:len(prefix)] == prefix {
item, found, err := c.Get(name)
if err == nil && found {
return item, true, nil
}
}
}
return nil, false, nil
}
func (c *FileCache) Set(key string, data *CacheItem) error {
if key == "" {
return fmt.Errorf("cache key cannot be empty")
@ -434,6 +562,11 @@ func (c *FileCache) Set(key string, data *CacheItem) error {
return fmt.Errorf("cannot cache nil CacheItem")
}
// Store in memory cache first
if err := c.memCache.Set(key, data); err != nil {
return err
}
// Check and cleanup if needed
maxSize := c.maxSizeBytes
if maxSize > 0 {

@ -97,7 +97,10 @@ func ProxyURL(ctx *gin.Context, u string, headers map[string]string, cache bool)
}
if cache && settings.ProxyCacheEnable.Get() {
c, cancel := context.WithCancel(ctx)
defer cancel()
rsc := NewHttpReadSeekCloser(u,
WithContext(c),
WithHeadersMap(headers),
WithNotSupportRange(ctx.GetHeader("Range") == ""),
)

@ -31,7 +31,7 @@ type HttpReadSeekCloser struct {
notAllowedStatusCodes []int
allowedStatusCodes []int
offset int64
contentLength int64
contentTotalLength int64
length int64
currentRespMaxOffset int64
notSupportRange bool
@ -87,10 +87,10 @@ func WithContext(ctx context.Context) HttpReadSeekerConf {
}
}
func WithContentLength(contentLength int64) HttpReadSeekerConf {
func WithContentTotalLength(contentTotalLength int64) HttpReadSeekerConf {
return func(h *HttpReadSeekCloser) {
if contentLength >= 0 {
h.contentLength = contentLength
if contentTotalLength >= 0 {
h.contentTotalLength = contentTotalLength
}
}
}
@ -135,14 +135,13 @@ func WithNotSupportRange(notSupportRange bool) HttpReadSeekerConf {
func NewHttpReadSeekCloser(url string, conf ...HttpReadSeekerConf) *HttpReadSeekCloser {
rs := &HttpReadSeekCloser{
url: url,
contentLength: -1,
method: http.MethodGet,
headMethod: http.MethodHead,
length: 1024 * 1024 * 16,
headers: make(http.Header),
ctx: context.Background(),
client: http.DefaultClient,
url: url,
contentTotalLength: -1,
method: http.MethodGet,
headMethod: http.MethodHead,
length: 1024 * 1024 * 16,
headers: make(http.Header),
client: http.DefaultClient,
}
for _, c := range conf {
@ -219,7 +218,7 @@ func (h *HttpReadSeekCloser) Read(p []byte) (n int, err error) {
func (h *HttpReadSeekCloser) FetchNextChunk() error {
h.closeCurrentResp()
if h.contentLength > 0 && h.offset >= h.contentLength {
if h.contentTotalLength > 0 && h.offset >= h.contentTotalLength {
return io.EOF
}
@ -244,14 +243,14 @@ func (h *HttpReadSeekCloser) FetchNextChunk() error {
return fmt.Errorf("server does not support range requests, cannot seek to non-zero offset")
}
h.notSupportRange = true
h.contentLength = resp.ContentLength
h.currentRespMaxOffset = h.contentLength - 1
h.contentTotalLength = resp.ContentLength
h.currentRespMaxOffset = h.contentTotalLength - 1
h.currentResp = resp
return nil
}
// if the content length is not known, it may be because the requested length is too long, and a new request is needed
if h.contentLength < 0 {
h.contentLength = resp.ContentLength
if h.contentTotalLength < 0 {
h.contentTotalLength = resp.ContentLength
resp.Body.Close()
return h.FetchNextChunk()
}
@ -261,7 +260,7 @@ func (h *HttpReadSeekCloser) FetchNextChunk() error {
return fmt.Errorf("server does not support range requests, cannot seek to offset %d", h.offset)
}
h.notSupportRange = true
h.currentRespMaxOffset = h.contentLength - 1
h.currentRespMaxOffset = h.contentTotalLength - 1
h.currentResp = resp
return nil
}
@ -278,7 +277,7 @@ func (h *HttpReadSeekCloser) FetchNextChunk() error {
contentTotalLength, err := ParseContentRangeTotalLength(resp.Header.Get("Content-Range"))
if err == nil && contentTotalLength > 0 {
h.contentLength = contentTotalLength
h.contentTotalLength = contentTotalLength
}
_, end, err := ParseContentRangeStartAndEnd(resp.Header.Get("Content-Range"))
if err == nil && end != -1 {
@ -291,8 +290,8 @@ func (h *HttpReadSeekCloser) FetchNextChunk() error {
func (h *HttpReadSeekCloser) createRequest() (*http.Request, error) {
if h.notSupportRange {
if h.contentLength != -1 {
h.currentRespMaxOffset = h.contentLength - 1
if h.contentTotalLength != -1 {
h.currentRespMaxOffset = h.contentTotalLength - 1
}
return h.createRequestWithoutRange()
}
@ -303,8 +302,8 @@ func (h *HttpReadSeekCloser) createRequest() (*http.Request, error) {
}
end := h.offset + h.length - 1
if h.contentLength > 0 && end > h.contentLength-1 {
end = h.contentLength - 1
if h.contentTotalLength > 0 && end > h.contentTotalLength-1 {
end = h.contentTotalLength - 1
}
h.currentRespMaxOffset = end
@ -395,12 +394,12 @@ func (h *HttpReadSeekCloser) calculateNewOffset(offset int64, whence int) (int64
}
return h.offset + offset, nil
case io.SeekEnd:
if h.contentLength < 0 {
if h.contentTotalLength < 0 {
if err := h.fetchContentLength(); err != nil {
return 0, fmt.Errorf("failed to fetch content length: %w", err)
}
}
newOffset := h.contentLength - offset
newOffset := h.contentTotalLength - offset
if h.notSupportRange && newOffset != h.offset {
return 0, fmt.Errorf("server does not support range requests, cannot seek to non-zero offset")
}
@ -437,7 +436,7 @@ func (h *HttpReadSeekCloser) fetchContentLength() error {
h.contentType = resp.Header.Get("Content-Type")
h.contentLength = resp.ContentLength
h.contentTotalLength = resp.ContentLength
h.headHeaders = resp.Header.Clone()
return nil
}
@ -454,7 +453,7 @@ func (h *HttpReadSeekCloser) Offset() int64 {
}
func (h *HttpReadSeekCloser) ContentLength() int64 {
return h.contentLength
return h.contentTotalLength
}
func (h *HttpReadSeekCloser) ContentType() (string, error) {
@ -465,12 +464,16 @@ func (h *HttpReadSeekCloser) ContentType() (string, error) {
}
func (h *HttpReadSeekCloser) ContentTotalLength() (int64, error) {
if h.contentLength > 0 {
return h.contentLength, nil
if h.contentTotalLength > 0 {
return h.contentTotalLength, nil
}
return 0, fmt.Errorf("content total length is not available - no successful response received yet")
}
func (h *HttpReadSeekCloser) SetContentTotalLength(length int64) {
h.contentTotalLength = length
}
func ParseContentRangeStartAndEnd(contentRange string) (int64, int64, error) {
if contentRange == "" {
return 0, 0, fmt.Errorf("Content-Range header is empty")

@ -10,6 +10,7 @@ import (
"strings"
"github.com/zijiren233/ksync"
"github.com/zijiren233/stream"
)
var mu = ksync.DefaultKmutex()
@ -21,6 +22,10 @@ type Proxy interface {
ContentType() (string, error)
}
type SetContentTotalLength interface {
SetContentTotalLength(int64)
}
// Headers defines the interface for accessing response headers
type Headers interface {
Headers() http.Header
@ -45,16 +50,23 @@ func NewSliceCacheProxy(key string, sliceSize int64, r Proxy, cache Cache) *Slic
}
func cacheKey(key string, offset int64, sliceSize int64) string {
key = fmt.Sprintf("%s-%d-%d", key, offset, sliceSize)
hash := sha256.Sum256([]byte(key))
return hex.EncodeToString(hash[:])
hash := sha256.Sum256(stream.StringToBytes(key))
return fmt.Sprintf("%s-%d-%d", hex.EncodeToString(hash[:]), sliceSize, offset)
}
func cachePrefix(key string, sliceSize int64) string {
hash := sha256.Sum256(stream.StringToBytes(key))
return fmt.Sprintf("%s-%d", hex.EncodeToString(hash[:]), sliceSize)
}
func (c *SliceCacheProxy) alignedOffset(offset int64) int64 {
return (offset / c.sliceSize) * c.sliceSize
func alignedOffset(offset, sliceSize int64) int64 {
return (offset / sliceSize) * sliceSize
}
func (c *SliceCacheProxy) fmtContentRange(start, end, total int64) string {
func fmtContentRange(start, end, total int64) string {
if total == -1 && end == -1 {
return "bytes */*"
}
totalStr := "*"
if total >= 0 {
totalStr = strconv.FormatInt(total, 10)
@ -68,14 +80,11 @@ func (c *SliceCacheProxy) fmtContentRange(start, end, total int64) string {
return fmt.Sprintf("bytes %d-%d/%s", start, end, totalStr)
}
func (c *SliceCacheProxy) contentLength(start, end, total int64) int64 {
func contentLength(start, end, total int64) int64 {
if total == -1 && end == -1 {
return -1
}
if end == -1 {
if total == -1 {
return -1
}
return total - start
}
if end >= total && total != -1 {
@ -84,8 +93,8 @@ func (c *SliceCacheProxy) contentLength(start, end, total int64) int64 {
return end - start + 1
}
func (c *SliceCacheProxy) fmtContentLength(start, end, total int64) string {
length := c.contentLength(start, end, total)
func fmtContentLength(start, end, total int64) string {
length := contentLength(start, end, total)
if length == -1 {
return ""
}
@ -101,24 +110,40 @@ func (c *SliceCacheProxy) Proxy(w http.ResponseWriter, r *http.Request) error {
byteRange, err := ParseByteRange(r.Header.Get("Range"))
if err != nil {
http.Error(w, fmt.Sprintf("Failed to parse Range header: %v", err), http.StatusBadRequest)
return err
return fmt.Errorf("failed to parse Range header: %w", err)
}
isRangeRequest := r.Header.Get("Range") != ""
if isRangeRequest {
// avoid the request exceeding the total length of the file due to the large slice size
if st, ok := c.r.(SetContentTotalLength); ok {
cacheItem, ok, err := c.cache.GetAnyWithPrefix(cachePrefix(c.key, c.sliceSize))
if err != nil {
http.Error(w, fmt.Sprintf("Failed to get cache item: %v", err), http.StatusInternalServerError)
return fmt.Errorf("failed to get cache item: %w", err)
}
if ok {
st.SetContentTotalLength(cacheItem.Metadata.ContentTotalLength)
}
}
}
alignedOffset := c.alignedOffset(byteRange.Start)
alignedOffset := alignedOffset(byteRange.Start, c.sliceSize)
cacheItem, err := c.getCacheItem(alignedOffset)
if err != nil {
http.Error(w, fmt.Sprintf("Failed to get cache item: %v", err), http.StatusInternalServerError)
return err
}
c.setResponseHeaders(w, byteRange, cacheItem, r.Header.Get("Range") != "")
c.setResponseHeaders(w, byteRange, cacheItem, isRangeRequest)
if err := c.writeResponse(w, byteRange, alignedOffset, cacheItem); err != nil {
return fmt.Errorf("failed to write response: %w", err)
}
return nil
}
func (c *SliceCacheProxy) setResponseHeaders(w http.ResponseWriter, byteRange *ByteRange, cacheItem *CacheItem, hasRange bool) {
func (c *SliceCacheProxy) setResponseHeaders(w http.ResponseWriter, byteRange *ByteRange, cacheItem *CacheItem, isRangeRequest bool) {
// Copy headers excluding special ones
for k, v := range cacheItem.Metadata.Headers {
switch k {
@ -129,11 +154,11 @@ func (c *SliceCacheProxy) setResponseHeaders(w http.ResponseWriter, byteRange *B
}
}
w.Header().Set("Content-Length", c.fmtContentLength(byteRange.Start, byteRange.End, cacheItem.Metadata.ContentTotalLength))
w.Header().Set("Content-Length", fmtContentLength(byteRange.Start, byteRange.End, cacheItem.Metadata.ContentTotalLength))
w.Header().Set("Content-Type", cacheItem.Metadata.ContentType)
if hasRange {
if isRangeRequest {
w.Header().Set("Accept-Ranges", "bytes")
w.Header().Set("Content-Range", c.fmtContentRange(byteRange.Start, byteRange.End, cacheItem.Metadata.ContentTotalLength))
w.Header().Set("Content-Range", fmtContentRange(byteRange.Start, byteRange.End, cacheItem.Metadata.ContentTotalLength))
w.WriteHeader(http.StatusPartialContent)
} else {
w.WriteHeader(http.StatusOK)
@ -146,7 +171,7 @@ func (c *SliceCacheProxy) writeResponse(w http.ResponseWriter, byteRange *ByteRa
return fmt.Errorf("slice offset cannot be negative, got: %d", sliceOffset)
}
remainingLength := c.contentLength(byteRange.Start, byteRange.End, cacheItem.Metadata.ContentTotalLength)
remainingLength := contentLength(byteRange.Start, byteRange.End, cacheItem.Metadata.ContentTotalLength)
if remainingLength == 0 {
return nil
}

Loading…
Cancel
Save