Feat: proxy buffer

pull/21/head
zijiren233 1 year ago
parent 6d7b7417dd
commit 16f3a69371

@ -173,8 +173,6 @@ golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug
golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs=
golang.org/x/net v0.10.0/go.mod h1:0qNGK6F8kojg2nk9dLZ2mShWaEBan6FAoqfSigmmuDg=
golang.org/x/net v0.15.0/go.mod h1:idbUs1IY1+zTqbi8yxTbhexhEEk5ur9LInksu6HrEpk=
golang.org/x/net v0.16.0 h1:7eBu7KsSvFDtSXUIDbh3aqlK4DPsZ1rByC8PFfBThos=
golang.org/x/net v0.16.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
golang.org/x/net v0.17.0 h1:pVaXccu2ozPjCXewfr1S7xza/zcXTity9cCdXQYSjIM=
golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=

@ -0,0 +1,64 @@
package proxy
import "io"
type BufferedReadSeeker struct {
r io.ReadSeeker
buffer []byte
readIdx, writeIdx int
}
func NewBufferedReadSeeker(r io.ReadSeeker, bufSize int) *BufferedReadSeeker {
if bufSize == 0 {
bufSize = 64 * 1024
}
return &BufferedReadSeeker{r: r, buffer: make([]byte, bufSize)}
}
func (b *BufferedReadSeeker) Reset(r io.ReadSeeker) {
b.r = r
b.readIdx, b.writeIdx = 0, 0
}
func (b *BufferedReadSeeker) Read(p []byte) (n int, err error) {
if len(p) == 0 {
return n, err
}
if b.readIdx == b.writeIdx {
if len(p) >= len(b.buffer) {
n, err = b.r.Read(p)
return n, err
}
b.readIdx, b.writeIdx = 0, 0
n, err = b.r.Read(b.buffer)
if n == 0 {
return n, err
}
b.writeIdx += n
}
n = copy(p, b.buffer[b.readIdx:b.writeIdx])
b.readIdx += n
return n, err
}
func (b *BufferedReadSeeker) Seek(offset int64, whence int) (int64, error) {
n, err := b.r.Seek(offset, whence)
b.Reset(b.r)
return n, err
}
func (b *BufferedReadSeeker) ReadAt(p []byte, off int64) (int, error) {
_, err := b.Seek(off, io.SeekStart)
if err != nil {
return 0, err
}
return b.Read(p)
}

@ -0,0 +1,165 @@
package proxy
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"net/http"
"strconv"
)
type HttpReadSeeker struct {
offset int64
url string
contentLength int64
method string
body io.Reader
client *http.Client
headers map[string]string
ctx context.Context
}
type HttpReadSeekerConf func(h *HttpReadSeeker)
func WithHeaders(headers map[string]string) HttpReadSeekerConf {
return func(h *HttpReadSeeker) {
h.headers = headers
}
}
func WithAppendHeaders(headers map[string]string) HttpReadSeekerConf {
return func(h *HttpReadSeeker) {
for k, v := range headers {
h.headers[k] = v
}
}
}
func WithClient(client *http.Client) HttpReadSeekerConf {
return func(h *HttpReadSeeker) {
h.client = client
}
}
func WithMethod(method string) HttpReadSeekerConf {
return func(h *HttpReadSeeker) {
h.method = method
}
}
func WithContext(ctx context.Context) HttpReadSeekerConf {
return func(h *HttpReadSeeker) {
h.ctx = ctx
}
}
func WithBody(body []byte) HttpReadSeekerConf {
return func(h *HttpReadSeeker) {
if len(body) != 0 {
h.body = bytes.NewReader(body)
}
}
}
func WithContentLength(contentLength int64) HttpReadSeekerConf {
return func(h *HttpReadSeeker) {
if contentLength >= 0 {
h.contentLength = contentLength
}
}
}
func NewHttpReadSeeker(url string, conf ...HttpReadSeekerConf) *HttpReadSeeker {
rs := &HttpReadSeeker{
offset: 0,
url: url,
contentLength: -1,
method: http.MethodGet,
}
for _, c := range conf {
c(rs)
}
if rs.client == nil {
rs.client = http.DefaultClient
}
rs.fix()
return rs
}
func NewBufferedHttpReadSeeker(bufSize int, url string, conf ...HttpReadSeekerConf) *BufferedReadSeeker {
if bufSize == 0 {
bufSize = 64 * 1024
}
return &BufferedReadSeeker{r: NewHttpReadSeeker(url, conf...), buffer: make([]byte, bufSize)}
}
func (h *HttpReadSeeker) fix() *HttpReadSeeker {
if h.method == "" {
h.method = http.MethodGet
}
if h.ctx == nil {
h.ctx = context.Background()
}
return h
}
func (h *HttpReadSeeker) Read(p []byte) (n int, err error) {
req, err := http.NewRequestWithContext(h.ctx, h.method, h.url, h.body)
if err != nil {
return 0, err
}
for k, v := range h.headers {
req.Header.Set(k, v)
}
req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", h.offset, h.offset+int64(len(p))-1))
resp, err := h.client.Do(req)
if err != nil {
return 0, err
}
defer resp.Body.Close()
n, err = io.ReadFull(resp.Body, p)
h.offset += int64(n)
return n, err
}
func (h *HttpReadSeeker) Seek(offset int64, whence int) (int64, error) {
switch whence {
case io.SeekStart:
h.offset = offset
case io.SeekCurrent:
h.offset += offset
case io.SeekEnd:
if h.contentLength < 0 {
req, err := http.NewRequestWithContext(h.ctx, http.MethodHead, h.url, nil)
if err != nil {
return 0, err
}
for k, v := range h.headers {
req.Header.Set(k, v)
}
resp, err := h.client.Do(req)
if err != nil {
return 0, err
}
defer resp.Body.Close()
h.contentLength, err = strconv.ParseInt(resp.Header.Get("Content-Length"), 10, 64)
if err != nil {
return 0, err
}
if h.contentLength < 0 {
return 0, errors.New("content length error")
}
}
h.offset = h.contentLength - offset
default:
return 0, errors.New("whence value error")
}
return h.offset, nil
}
func (h *HttpReadSeeker) Close() error {
return nil
}

@ -3,7 +3,6 @@ package handlers
import (
"errors"
"fmt"
"io"
"net/http"
"net/url"
"path"
@ -18,6 +17,7 @@ import (
json "github.com/json-iterator/go"
"github.com/synctv-org/synctv/internal/conf"
pb "github.com/synctv-org/synctv/proto"
"github.com/synctv-org/synctv/proxy"
"github.com/synctv-org/synctv/room"
"github.com/synctv-org/synctv/utils"
"github.com/zijiren233/livelib/av"
@ -193,6 +193,7 @@ func PushMovie(ctx *gin.Context) {
ctx.AbortWithStatusJSON(http.StatusBadRequest, NewApiErrorStringResp("movie proxy is not enabled"))
return
}
movie.PullKey = uuid.New().String()
fallthrough
case !movie.Live && !movie.Proxy, movie.Live && !movie.Proxy && !movie.RtmpSource:
u, err := url.Parse(movie.Url)
@ -513,58 +514,65 @@ const UserAgent = `Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/5
func ProxyMovie(ctx *gin.Context) {
rooms := ctx.Value("rooms").(*room.Rooms)
roomid := ctx.Query("roomid")
if roomid == "" {
ctx.AbortWithStatusJSON(http.StatusBadRequest, NewApiErrorStringResp("roomid is empty"))
roomId := ctx.Param("roomId")
if roomId == "" {
ctx.AbortWithStatusJSON(http.StatusBadRequest, NewApiErrorStringResp("roomId is empty"))
return
}
room, err := rooms.GetRoom(roomid)
room, err := rooms.GetRoom(roomId)
if err != nil {
ctx.AbortWithStatusJSON(http.StatusBadRequest, NewApiErrorResp(err))
}
cm := room.Current().Movie
if !cm.Proxy || cm.Live {
ctx.AbortWithStatusJSON(http.StatusBadRequest, NewApiErrorStringResp("not support proxy"))
return
}
u, err := url.Parse(cm.Url)
m, err := room.GetMovieWithPullKey(ctx.Param("pullKey"))
if err != nil {
ctx.AbortWithStatusJSON(http.StatusBadRequest, NewApiErrorResp(err))
return
}
req := resty.New().R().
SetHeader("Range", ctx.GetHeader("Range")).
SetHeader("User-Agent", UserAgent).
SetHeader("Referer", fmt.Sprintf("%s://%s/", u.Scheme, u.Host)).
SetHeader("Origin", fmt.Sprintf("%s://%s", u.Scheme, u.Host)).
SetHeader("Accept", ctx.GetHeader("Accept")).
SetHeader("Accept-Encoding", ctx.GetHeader("Accept-Encoding")).
SetHeader("Accept-Language", ctx.GetHeader("Accept-Language"))
if cm.Headers != nil {
for k, v := range cm.Headers {
req.SetHeader(k, v)
}
if !m.Proxy || m.Live || m.RtmpSource {
ctx.AbortWithStatusJSON(http.StatusBadRequest, NewApiErrorStringResp("not support proxy"))
return
}
resp, err := req.Get(cm.Url)
r := resty.New().R()
for k, v := range m.Headers {
r.SetHeader(k, v)
}
resp, err := r.Head(m.Url)
if err != nil {
ctx.AbortWithStatusJSON(http.StatusInternalServerError, NewApiErrorResp(err))
return
}
defer resp.RawBody().Close()
if _, ok := allowedProxyMovieType[resp.Header().Get("Content-Type")]; !ok {
ctx.AbortWithStatusJSON(http.StatusBadRequest, NewApiErrorResp(fmt.Errorf("this movie type support proxy: %s", resp.Header().Get("Content-Type"))))
return
}
for k, v := range resp.Header() {
ctx.Header(k, v[0])
}
ctx.Status(resp.StatusCode())
io.Copy(ctx.Writer, resp.RawBody())
ctx.Header("Content-Type", resp.Header().Get("Content-Type"))
l := resp.Header().Get("Content-Length")
ctx.Header("Content-Length", l)
length, err := strconv.ParseInt(l, 10, 64)
if err != nil {
ctx.AbortWithStatusJSON(http.StatusInternalServerError, NewApiErrorResp(err))
return
}
hrs := proxy.NewBufferedHttpReadSeeker(128*1024, m.Url,
proxy.WithContext(ctx),
proxy.WithHeaders(m.Headers),
proxy.WithContext(ctx),
proxy.WithContentLength(length),
)
name := resp.Header().Get("Content-Disposition")
if name == "" {
name = m.Url
}
http.ServeContent(ctx.Writer, ctx.Request, name, time.Now(), hrs)
}
type FormatErrNotSupportFileType string

@ -105,7 +105,7 @@ func Init(e *gin.Engine, s *rtmps.Server, r *room.Rooms) {
movie.POST("/clear", ClearMovies)
movie.GET("/proxy/:roomid", ProxyMovie)
movie.GET("/proxy/:roomId/:pullKey", ProxyMovie)
{
live := movie.Group("/live")

Loading…
Cancel
Save