Fix: nocopy

pull/31/head
zijiren233 2 years ago
parent eaca40b5a0
commit f0fe40a184

@ -2,12 +2,9 @@ package model
import ( import (
"fmt" "fmt"
"sync/atomic"
"time" "time"
"github.com/synctv-org/synctv/utils" "github.com/synctv-org/synctv/utils"
"github.com/zijiren233/gencontainer/refreshcache"
"github.com/zijiren233/gencontainer/rwmap"
"gorm.io/gorm" "gorm.io/gorm"
) )
@ -19,46 +16,6 @@ type Movie struct {
RoomID string `gorm:"not null;index" json:"-"` RoomID string `gorm:"not null;index" json:"-"`
CreatorID string `gorm:"not null;index" json:"creatorId"` CreatorID string `gorm:"not null;index" json:"creatorId"`
Base BaseMovie `gorm:"embedded;embeddedPrefix:base_" json:"base"` Base BaseMovie `gorm:"embedded;embeddedPrefix:base_" json:"base"`
Cache BaseCache `gorm:"-:all" json:"-"`
}
type BaseCache struct {
URL rwmap.RWMap[string, *refreshcache.RefreshCache[string]]
MPD atomic.Pointer[refreshcache.RefreshCache[*MPDCache]]
}
type MPDCache struct {
MPDFile string
URLs []string
}
func (b *BaseCache) Clear() {
b.MPD.Store(nil)
b.URL.Clear()
}
func (b *BaseCache) InitOrLoadURLCache(id string, refreshFunc func() (string, error), maxAge time.Duration) (*refreshcache.RefreshCache[string], error) {
c, loaded := b.URL.Load(id)
if loaded {
return c, nil
}
c, _ = b.URL.LoadOrStore(id, refreshcache.NewRefreshCache[string](refreshFunc, maxAge))
return c, nil
}
func (b *BaseCache) InitOrLoadMPDCache(refreshFunc func() (*MPDCache, error), maxAge time.Duration) (*refreshcache.RefreshCache[*MPDCache], error) {
c := b.MPD.Load()
if c != nil {
return c, nil
}
c = refreshcache.NewRefreshCache[*MPDCache](refreshFunc, maxAge)
if b.MPD.CompareAndSwap(nil, c) {
return c, nil
} else {
return b.InitOrLoadMPDCache(refreshFunc, maxAge)
}
} }
func (m *Movie) BeforeCreate(tx *gorm.DB) error { func (m *Movie) BeforeCreate(tx *gorm.DB) error {

@ -14,14 +14,17 @@ type current struct {
} }
type Current struct { type Current struct {
Movie model.Movie `json:"movie"` Movie *Movie `json:"movie"`
Status Status `json:"status"` Status Status `json:"status"`
} }
func newCurrent() *current { func newCurrent() *current {
return &current{ return &current{
current: Current{ current: Current{
Status: newStatus(), Status: newStatus(),
Movie: &Movie{
Movie: &model.Movie{},
},
}, },
} }
} }
@ -48,18 +51,24 @@ func (c *current) Current() Current {
return c.current return c.current
} }
func (c *current) Movie() model.Movie { func (c *current) Movie() *Movie {
c.lock.RLock() c.lock.RLock()
defer c.lock.RUnlock() defer c.lock.RUnlock()
return c.current.Movie return c.current.Movie
} }
func (c *current) SetMovie(movie model.Movie, play bool) { func (c *current) SetMovie(movie *Movie, play bool) {
c.lock.Lock() c.lock.Lock()
defer c.lock.Unlock() defer c.lock.Unlock()
c.current.Movie = movie if movie == nil || movie.Movie == nil {
c.current.Movie = &Movie{
Movie: &model.Movie{},
}
} else {
c.current.Movie = movie
}
c.current.SetSeek(0, 0) c.current.SetSeek(0, 0)
c.current.Status.Playing = play c.current.Status.Playing = play
} }
@ -87,7 +96,14 @@ func (c *current) SetSeekRate(seek, rate, timeDiff float64) Status {
func (c *Current) Proto() *pb.Current { func (c *Current) Proto() *pb.Current {
current := &pb.Current{ current := &pb.Current{
Movie: &pb.MovieInfo{ Status: &pb.Status{
Seek: c.Status.Seek,
Rate: c.Status.Rate,
Playing: c.Status.Playing,
},
}
if c.Movie != nil {
current.Movie = &pb.MovieInfo{
Id: c.Movie.ID, Id: c.Movie.ID,
Base: &pb.BaseMovieInfo{ Base: &pb.BaseMovieInfo{
Url: c.Movie.Base.Url, Url: c.Movie.Base.Url,
@ -100,26 +116,21 @@ func (c *Current) Proto() *pb.Current {
}, },
CreatedAt: c.Movie.CreatedAt.UnixMilli(), CreatedAt: c.Movie.CreatedAt.UnixMilli(),
Creator: GetUserName(c.Movie.CreatorID), Creator: GetUserName(c.Movie.CreatorID),
},
Status: &pb.Status{
Seek: c.Status.Seek,
Rate: c.Status.Rate,
Playing: c.Status.Playing,
},
}
if c.Movie.Base.VendorInfo.Vendor != "" {
current.Movie.Base.VendorInfo = &pb.VendorInfo{
Vendor: string(c.Movie.Base.VendorInfo.Vendor),
Shared: c.Movie.Base.VendorInfo.Shared,
} }
switch c.Movie.Base.VendorInfo.Vendor { if c.Movie.Base.VendorInfo.Vendor != "" {
case model.StreamingVendorBilibili: current.Movie.Base.VendorInfo = &pb.VendorInfo{
current.Movie.Base.VendorInfo.Bilibili = &pb.BilibiliVendorInfo{ Vendor: string(c.Movie.Base.VendorInfo.Vendor),
Bvid: c.Movie.Base.VendorInfo.Bilibili.Bvid, Shared: c.Movie.Base.VendorInfo.Shared,
Cid: c.Movie.Base.VendorInfo.Bilibili.Cid, }
Epid: c.Movie.Base.VendorInfo.Bilibili.Epid, switch c.Movie.Base.VendorInfo.Vendor {
Quality: c.Movie.Base.VendorInfo.Bilibili.Quality, case model.StreamingVendorBilibili:
VendorName: c.Movie.Base.VendorInfo.Bilibili.VendorName, current.Movie.Base.VendorInfo.Bilibili = &pb.BilibiliVendorInfo{
Bvid: c.Movie.Base.VendorInfo.Bilibili.Bvid,
Cid: c.Movie.Base.VendorInfo.Bilibili.Cid,
Epid: c.Movie.Base.VendorInfo.Bilibili.Epid,
Quality: c.Movie.Base.VendorInfo.Bilibili.Quality,
VendorName: c.Movie.Base.VendorInfo.Bilibili.VendorName,
}
} }
} }
} }

@ -5,6 +5,7 @@ import (
"fmt" "fmt"
"net/url" "net/url"
"sync" "sync"
"sync/atomic"
"time" "time"
"github.com/go-resty/resty/v2" "github.com/go-resty/resty/v2"
@ -12,6 +13,8 @@ import (
"github.com/synctv-org/synctv/internal/model" "github.com/synctv-org/synctv/internal/model"
"github.com/synctv-org/synctv/internal/settings" "github.com/synctv-org/synctv/internal/settings"
"github.com/synctv-org/synctv/utils" "github.com/synctv-org/synctv/utils"
"github.com/zijiren233/gencontainer/refreshcache"
"github.com/zijiren233/gencontainer/rwmap"
"github.com/zijiren233/livelib/av" "github.com/zijiren233/livelib/av"
"github.com/zijiren233/livelib/container/flv" "github.com/zijiren233/livelib/container/flv"
"github.com/zijiren233/livelib/protocol/hls" "github.com/zijiren233/livelib/protocol/hls"
@ -24,6 +27,46 @@ type Movie struct {
*model.Movie *model.Movie
lock sync.RWMutex lock sync.RWMutex
channel *rtmps.Channel channel *rtmps.Channel
cache *BaseCache `gorm:"-:all" json:"-"`
}
type BaseCache struct {
URL rwmap.RWMap[string, *refreshcache.RefreshCache[string]]
MPD atomic.Pointer[refreshcache.RefreshCache[*MPDCache]]
}
type MPDCache struct {
MPDFile string
URLs []string
}
func (b *BaseCache) Clear() {
b.MPD.Store(nil)
b.URL.Clear()
}
func (b *BaseCache) InitOrLoadURLCache(id string, refreshFunc func() (string, error), maxAge time.Duration) (*refreshcache.RefreshCache[string], error) {
c, loaded := b.URL.Load(id)
if loaded {
return c, nil
}
c, _ = b.URL.LoadOrStore(id, refreshcache.NewRefreshCache[string](refreshFunc, maxAge))
return c, nil
}
func (b *BaseCache) InitOrLoadMPDCache(refreshFunc func() (*MPDCache, error), maxAge time.Duration) (*refreshcache.RefreshCache[*MPDCache], error) {
c := b.MPD.Load()
if c != nil {
return c, nil
}
c = refreshcache.NewRefreshCache[*MPDCache](refreshFunc, maxAge)
if b.MPD.CompareAndSwap(nil, c) {
return c, nil
} else {
return b.InitOrLoadMPDCache(refreshFunc, maxAge)
}
} }
func (m *Movie) Channel() (*rtmps.Channel, error) { func (m *Movie) Channel() (*rtmps.Channel, error) {
@ -32,6 +75,15 @@ func (m *Movie) Channel() (*rtmps.Channel, error) {
return m.channel, m.init() return m.channel, m.init()
} }
func (m *Movie) Cache() *BaseCache {
m.lock.Lock()
defer m.lock.Unlock()
if m.cache == nil {
m.cache = &BaseCache{}
}
return m.cache
}
func genTsName() string { func genTsName() string {
return utils.SortUUID() return utils.SortUUID()
} }
@ -40,6 +92,7 @@ func (m *Movie) init() (err error) {
if err = m.Validate(); err != nil { if err = m.Validate(); err != nil {
return return
} }
switch { switch {
case m.Base.Live && m.Base.RtmpSource: case m.Base.Live && m.Base.RtmpSource:
if m.channel == nil { if m.channel == nil {
@ -209,13 +262,15 @@ func (m *Movie) terminate() {
m.channel.Close() m.channel.Close()
m.channel = nil m.channel = nil
} }
m.Cache.Clear() if m.cache != nil {
m.cache.Clear()
}
} }
func (m *Movie) Update(movie model.BaseMovie) error { func (m *Movie) Update(movie *model.BaseMovie) error {
m.lock.Lock() m.lock.Lock()
defer m.lock.Unlock() defer m.lock.Unlock()
m.terminate() m.terminate()
m.Movie.Base = movie m.Movie.Base = *movie
return m.init() return m.init()
} }

@ -112,7 +112,7 @@ func (m *movies) GetChannel(id string) (*rtmps.Channel, error) {
return nil, errors.New("channel not found") return nil, errors.New("channel not found")
} }
func (m *movies) Update(movieId string, movie model.BaseMovie) error { func (m *movies) Update(movieId string, movie *model.BaseMovie) error {
m.lock.Lock() m.lock.Lock()
defer m.lock.Unlock() defer m.lock.Unlock()
m.init() m.init()

@ -69,7 +69,7 @@ func (r *Room) CheckVersion(version uint32) bool {
return atomic.LoadUint32(&r.version) == version return atomic.LoadUint32(&r.version) == version
} }
func (r *Room) UpdateMovie(movieId string, movie model.BaseMovie) error { func (r *Room) UpdateMovie(movieId string, movie *model.BaseMovie) error {
return r.movies.Update(movieId, movie) return r.movies.Update(movieId, movie)
} }
@ -185,12 +185,12 @@ func (r *Room) SetCurrentMovieByID(id string, play bool) error {
if err != nil { if err != nil {
return err return err
} }
r.SetCurrentMovie(m.Movie, play) r.SetCurrentMovie(m, play)
return nil return nil
} }
func (r *Room) SetCurrentMovie(movie *model.Movie, play bool) { func (r *Room) SetCurrentMovie(movie *Movie, play bool) {
r.current.SetMovie(*movie, play) r.current.SetMovie(movie, play)
} }
func (r *Room) SwapMoviePositions(id1, id2 string) error { func (r *Room) SwapMoviePositions(id1, id2 string) error {

@ -122,7 +122,7 @@ func (u *User) SetUsername(username string) error {
return nil return nil
} }
func (u *User) UpdateMovie(room *Room, movieID string, movie model.BaseMovie) error { func (u *User) UpdateMovie(room *Room, movieID string, movie *model.BaseMovie) error {
m, err := room.GetMovieByID(movieID) m, err := room.GetMovieByID(movieID)
if err != nil { if err != nil {
return err return err
@ -176,7 +176,7 @@ func (u *User) ClearMovies(room *Room) error {
return room.ClearMovies() return room.ClearMovies()
} }
func (u *User) SetCurrentMovie(room *Room, movie *model.Movie, play bool) error { func (u *User) SetCurrentMovie(room *Room, movie *Movie, play bool) error {
if !u.HasRoomPermission(room, model.PermissionEditCurrent) { if !u.HasRoomPermission(room, model.PermissionEditCurrent) {
return model.ErrNoPermission return model.ErrNoPermission
} }
@ -189,5 +189,5 @@ func (u *User) SetCurrentMovieByID(room *Room, movieID string, play bool) error
if err != nil { if err != nil {
return err return err
} }
return u.SetCurrentMovie(room, m.Movie, play) return u.SetCurrentMovie(room, m, play)
} }

@ -80,8 +80,9 @@ func MovieList(ctx *gin.Context) {
Base: v.Base, Base: v.Base,
Creator: op.GetUserName(v.CreatorID), Creator: op.GetUserName(v.CreatorID),
} }
// hide headers when proxy // hide url and headers when proxy
if mresp[i].Base.Proxy { if mresp[i].Base.Proxy {
mresp[i].Base.Url = ""
mresp[i].Base.Headers = nil mresp[i].Base.Headers = nil
} }
} }
@ -97,7 +98,7 @@ func MovieList(ctx *gin.Context) {
func genCurrent(ctx context.Context, current *op.Current, userID string) (*op.Current, error) { func genCurrent(ctx context.Context, current *op.Current, userID string) (*op.Current, error) {
if current.Movie.Base.VendorInfo.Vendor != "" { if current.Movie.Base.VendorInfo.Vendor != "" {
return current, parse2VendorMovie(ctx, userID, &current.Movie) return current, parse2VendorMovie(ctx, userID, current.Movie)
} }
return current, nil return current, nil
} }
@ -111,8 +112,9 @@ func genCurrentResp(current *op.Current) *model.CurrentMovieResp {
Creator: op.GetUserName(current.Movie.CreatorID), Creator: op.GetUserName(current.Movie.CreatorID),
}, },
} }
// hide headers when proxy // hide url and headers when proxy
if c.Movie.Base.Proxy { if c.Movie.Base.Proxy {
c.Movie.Base.Url = ""
c.Movie.Base.Headers = nil c.Movie.Base.Headers = nil
} }
return c return c
@ -154,8 +156,9 @@ func Movies(ctx *gin.Context) {
Base: v.Base, Base: v.Base,
Creator: op.GetUserName(v.CreatorID), Creator: op.GetUserName(v.CreatorID),
} }
// hide headers when proxy // hide url and headers when proxy
if mresp[i].Base.Proxy { if mresp[i].Base.Proxy {
mresp[i].Base.Url = ""
mresp[i].Base.Headers = nil mresp[i].Base.Headers = nil
} }
} }
@ -297,7 +300,7 @@ func EditMovie(ctx *gin.Context) {
return return
} }
if err := user.UpdateMovie(room, req.Id, dbModel.BaseMovie(req.PushMovieReq)); err != nil { if err := user.UpdateMovie(room, req.Id, (*dbModel.BaseMovie)(&req.PushMovieReq)); err != nil {
ctx.AbortWithStatusJSON(http.StatusBadRequest, model.NewApiErrorResp(err)) ctx.AbortWithStatusJSON(http.StatusBadRequest, model.NewApiErrorResp(err))
return return
} }
@ -405,7 +408,7 @@ func ChangeCurrentMovie(ctx *gin.Context) {
} }
if req.Id == "" { if req.Id == "" {
err := user.SetCurrentMovie(room, &dbModel.Movie{}, false) err := user.SetCurrentMovie(room, nil, false)
if err != nil { if err != nil {
ctx.AbortWithStatusJSON(http.StatusBadRequest, model.NewApiErrorResp(err)) ctx.AbortWithStatusJSON(http.StatusBadRequest, model.NewApiErrorResp(err))
return return
@ -498,13 +501,13 @@ func ProxyMovie(ctx *gin.Context) {
} }
if m.Base.VendorInfo.Vendor != "" { if m.Base.VendorInfo.Vendor != "" {
proxyVendorMovie(ctx, m.Movie) proxyVendorMovie(ctx, m)
return return
} }
switch m.Base.Type { switch m.Base.Type {
case "mpd": case "mpd":
mpdCache, err := m.Cache.InitOrLoadMPDCache(initDashCache(ctx, m.Movie), time.Minute*5) mpdCache, err := m.Cache().InitOrLoadMPDCache(initDashCache(ctx, m.Movie), time.Minute*5)
if err != nil { if err != nil {
ctx.AbortWithStatusJSON(http.StatusInternalServerError, model.NewApiErrorResp(err)) ctx.AbortWithStatusJSON(http.StatusInternalServerError, model.NewApiErrorResp(err))
return return
@ -526,8 +529,8 @@ func ProxyMovie(ctx *gin.Context) {
} }
// only cache mpd file // only cache mpd file
func initDashCache(ctx context.Context, movie *dbModel.Movie) func() (*dbModel.MPDCache, error) { func initDashCache(ctx context.Context, movie *dbModel.Movie) func() (*op.MPDCache, error) {
return func() (*dbModel.MPDCache, error) { return func() (*op.MPDCache, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, movie.Base.Url, nil) req, err := http.NewRequestWithContext(ctx, http.MethodGet, movie.Base.Url, nil)
if err != nil { if err != nil {
return nil, err return nil, err
@ -560,7 +563,7 @@ func initDashCache(ctx context.Context, movie *dbModel.Movie) func() (*dbModel.M
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &dbModel.MPDCache{ return &op.MPDCache{
MPDFile: s, MPDFile: s,
}, nil }, nil
} }
@ -692,8 +695,8 @@ func JoinLive(ctx *gin.Context) {
} }
} }
func initBilibiliMPDCache(ctx context.Context, roomID, movieID, CreatorID string, info *dbModel.BilibiliVendorInfo) func() (*dbModel.MPDCache, error) { func initBilibiliMPDCache(ctx context.Context, roomID, movieID, CreatorID string, info *dbModel.BilibiliVendorInfo) func() (*op.MPDCache, error) {
return func() (*dbModel.MPDCache, error) { return func() (*op.MPDCache, error) {
v, err := db.FirstOrInitVendorByUserIDAndVendor(CreatorID, dbModel.StreamingVendorBilibili) v, err := db.FirstOrInitVendorByUserIDAndVendor(CreatorID, dbModel.StreamingVendorBilibili)
if err != nil { if err != nil {
return nil, err return nil, err
@ -746,7 +749,7 @@ func initBilibiliMPDCache(ctx context.Context, roomID, movieID, CreatorID string
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &dbModel.MPDCache{ return &op.MPDCache{
URLs: movies, URLs: movies,
MPDFile: s, MPDFile: s,
}, nil }, nil
@ -787,10 +790,10 @@ func initBilibiliShareCache(ctx context.Context, CreatorID string, info *dbModel
} }
} }
func proxyVendorMovie(ctx *gin.Context, movie *dbModel.Movie) { func proxyVendorMovie(ctx *gin.Context, movie *op.Movie) {
switch movie.Base.VendorInfo.Vendor { switch movie.Base.VendorInfo.Vendor {
case dbModel.StreamingVendorBilibili: case dbModel.StreamingVendorBilibili:
bvc, err := movie.Cache.InitOrLoadMPDCache(initBilibiliMPDCache(ctx, movie.RoomID, movie.ID, movie.CreatorID, movie.Base.VendorInfo.Bilibili), time.Minute*119) bvc, err := movie.Cache().InitOrLoadMPDCache(initBilibiliMPDCache(ctx, movie.RoomID, movie.ID, movie.CreatorID, movie.Base.VendorInfo.Bilibili), time.Minute*119)
if err != nil { if err != nil {
ctx.AbortWithStatusJSON(http.StatusInternalServerError, model.NewApiErrorResp(err)) ctx.AbortWithStatusJSON(http.StatusInternalServerError, model.NewApiErrorResp(err))
return return
@ -823,7 +826,7 @@ func proxyVendorMovie(ctx *gin.Context, movie *dbModel.Movie) {
} }
} }
func parse2VendorMovie(ctx context.Context, userID string, movie *dbModel.Movie) (err error) { func parse2VendorMovie(ctx context.Context, userID string, movie *op.Movie) (err error) {
if movie.Base.VendorInfo.Shared { if movie.Base.VendorInfo.Shared {
userID = movie.CreatorID userID = movie.CreatorID
} }
@ -831,7 +834,7 @@ func parse2VendorMovie(ctx context.Context, userID string, movie *dbModel.Movie)
switch movie.Base.VendorInfo.Vendor { switch movie.Base.VendorInfo.Vendor {
case dbModel.StreamingVendorBilibili: case dbModel.StreamingVendorBilibili:
if !movie.Base.Proxy { if !movie.Base.Proxy {
c, err := movie.Cache.InitOrLoadURLCache(userID, initBilibiliShareCache(ctx, movie.CreatorID, movie.Base.VendorInfo.Bilibili), time.Minute*119) c, err := movie.Cache().InitOrLoadURLCache(userID, initBilibiliShareCache(ctx, movie.CreatorID, movie.Base.VendorInfo.Bilibili), time.Minute*119)
if err != nil { if err != nil {
return err return err
} }

Loading…
Cancel
Save