diff --git a/internal/model/movie.go b/internal/model/movie.go index 52e0f80..232d49c 100644 --- a/internal/model/movie.go +++ b/internal/model/movie.go @@ -37,5 +37,6 @@ type BaseMovie struct { type VendorInfo struct { Vendor StreamingVendor `json:"vendor"` + Shared bool `gorm:"not null;default:false" json:"shared"` Info map[string]any `gorm:"serializer:fastjson" json:"info"` } diff --git a/internal/op/hub.go b/internal/op/hub.go index 30423a7..8fef438 100644 --- a/internal/op/hub.go +++ b/internal/op/hub.go @@ -205,3 +205,14 @@ func (h *Hub) UnRegClient(user *User) error { func (h *Hub) ClientNum() int64 { return h.clients.Len() } + +func (h *Hub) SendToUser(userID string, data Message) error { + if h.Closed() { + return ErrAlreadyClosed + } + cli, ok := h.clients.Load(userID) + if !ok { + return nil + } + return cli.Send(data) +} diff --git a/internal/op/message.go b/internal/op/message.go index 69471be..13fb22a 100644 --- a/internal/op/message.go +++ b/internal/op/message.go @@ -14,9 +14,11 @@ type Message interface { MessageType() int String() string Encode(w io.Writer) error + BeforeSend(sendTo *User) error } type ElementJsonMessage struct { + BeforeSendFunc func(sendTo *User) error *pb.ElementMessage } @@ -32,7 +34,15 @@ func (em *ElementJsonMessage) Encode(w io.Writer) error { return json.NewEncoder(w).Encode(em) } +func (em *ElementJsonMessage) BeforeSend(sendTo *User) error { + if em.BeforeSendFunc != nil { + return em.BeforeSendFunc(sendTo) + } + return nil +} + type ElementMessage struct { + BeforeSendFunc func(sendTo *User) error *pb.ElementMessage } @@ -53,6 +63,13 @@ func (em *ElementMessage) Encode(w io.Writer) error { return err } +func (em *ElementMessage) BeforeSend(sendTo *User) error { + if em.BeforeSendFunc != nil { + return em.BeforeSendFunc(sendTo) + } + return nil +} + type PingMessage struct{} func (pm *PingMessage) MessageType() int { @@ -66,3 +83,7 @@ func (pm *PingMessage) String() string { func (pm *PingMessage) Encode(w io.Writer) error { return nil } + +func (pm *PingMessage) BeforeSend(sendTo *User) error { + return nil +} diff --git a/internal/op/room.go b/internal/op/room.go index 09146f5..6e2da43 100644 --- a/internal/op/room.go +++ b/internal/op/room.go @@ -43,6 +43,13 @@ func (r *Room) Broadcast(data Message, conf ...BroadcastConf) error { return r.hub.Broadcast(data, conf...) } +func (r *Room) SendToUser(user *User, data Message) error { + if r.hub == nil { + return nil + } + return r.hub.SendToUser(user.ID, data) +} + func (r *Room) GetChannel(channelName string) (*rtmps.Channel, error) { return r.movies.GetChannel(channelName) } diff --git a/server/handlers/movie.go b/server/handlers/movie.go index b0efd77..2bcc4d0 100644 --- a/server/handlers/movie.go +++ b/server/handlers/movie.go @@ -49,7 +49,7 @@ func GetPageItems[T any](ctx *gin.Context, items []T) ([]T, error) { func MovieList(ctx *gin.Context) { room := ctx.MustGet("room").(*op.Room) - // user := ctx.MustGet("user").(*op.User) + user := ctx.MustGet("user").(*op.User) page, max, err := GetPageAndPageSize(ctx) if err != nil { @@ -68,7 +68,7 @@ func MovieList(ctx *gin.Context) { } } - current, err := genCurrent(room) + current, err := genCurrent(room.Current(), user) if err != nil { ctx.AbortWithStatusJSON(http.StatusInternalServerError, model.NewApiErrorResp(err)) return @@ -83,10 +83,9 @@ func MovieList(ctx *gin.Context) { })) } -func genCurrent(room *op.Room) (*op.Current, error) { - current := room.Current() +func genCurrent(current *op.Current, user *op.User) (*op.Current, error) { if current.Movie.Base.Vendor != "" { - return current, parse2VendorMovie(¤t.Movie) + return current, parse2VendorMovie(user, ¤t.Movie) } return current, nil } @@ -104,9 +103,9 @@ func genCurrentResp(current *op.Current) *model.CurrentMovieResp { func CurrentMovie(ctx *gin.Context) { room := ctx.MustGet("room").(*op.Room) - // user := ctx.MustGet("user").(*op.User) + user := ctx.MustGet("user").(*op.User) - current, err := genCurrent(room) + current, err := genCurrent(room.Current(), user) if err != nil { ctx.AbortWithStatusJSON(http.StatusInternalServerError, model.NewApiErrorResp(err)) return @@ -344,23 +343,55 @@ func ChangeCurrentMovie(ctx *gin.Context) { return } - current, err := genCurrent(room) + current, err := genCurrent(room.Current(), user) if err != nil { ctx.AbortWithStatusJSON(http.StatusInternalServerError, model.NewApiErrorResp(err)) return } - current.UpdateSeek() - if err := room.Broadcast(&op.ElementMessage{ - ElementMessage: &pb.ElementMessage{ - Type: pb.ElementMessageType_CHANGE_CURRENT, - Sender: user.Username, - Current: current.Proto(), - }, - }); err != nil { - ctx.AbortWithStatusJSON(http.StatusInternalServerError, model.NewApiErrorResp(err)) - return + if current.Movie.Base.VendorInfo.Shared { + if err := room.Broadcast(&op.ElementMessage{ + ElementMessage: &pb.ElementMessage{ + Type: pb.ElementMessageType_CHANGE_CURRENT, + Sender: user.Username, + Current: current.Proto(), + }, + }); err != nil { + ctx.AbortWithStatusJSON(http.StatusInternalServerError, model.NewApiErrorResp(err)) + return + } + } else { + if err := room.SendToUser(user, &op.ElementMessage{ + ElementMessage: &pb.ElementMessage{ + Type: pb.ElementMessageType_CHANGE_CURRENT, + Sender: user.Username, + Current: current.Proto(), + }, + }); err != nil { + ctx.AbortWithStatusJSON(http.StatusInternalServerError, model.NewApiErrorResp(err)) + return + } + + m := &pb.ElementMessage{ + Type: pb.ElementMessageType_CHANGE_CURRENT, + Sender: user.Username, + } + if err := room.Broadcast(&op.ElementMessage{ + ElementMessage: m, + BeforeSendFunc: func(sendTo *op.User) error { + current, err := genCurrent(room.Current(), sendTo) + if err != nil { + return err + } + current.UpdateSeek() + m.Current = current.Proto() + return nil + }, + }, op.WithIgnoreId(user.ID)); err != nil { + ctx.AbortWithStatusJSON(http.StatusInternalServerError, model.NewApiErrorResp(err)) + return + } } ctx.Status(http.StatusNoContent) @@ -582,7 +613,7 @@ func ProxyVendorMovie(ctx *gin.Context, m *dbModel.Movie) { } } -func parse2VendorMovie(movie *dbModel.Movie) error { +func parse2VendorMovie(user *op.User, movie *dbModel.Movie) (err error) { if movie.Base.Proxy { return nil } @@ -607,10 +638,10 @@ func parse2VendorMovie(movie *dbModel.Movie) error { return fmt.Errorf("bvid is not string") } } else if epIdI != nil { - // epId, ok = epIdI.(float64) - // if !ok { - // return fmt.Errorf("epId is not number") - // } + _, ok = epIdI.(float64) + if !ok { + return fmt.Errorf("epId is not number") + } } else { return fmt.Errorf("bvid and epId is empty") } @@ -625,12 +656,24 @@ func parse2VendorMovie(movie *dbModel.Movie) error { } if bvid != "" { - vendor, err := db.AssignFirstOrCreateVendorByUserIDAndVendor(movie.CreatorID, dbModel.StreamingVendorBilibili) + id := user.ID + if movie.Base.VendorInfo.Shared { + id = movie.CreatorID + } + vendor, err := db.AssignFirstOrCreateVendorByUserIDAndVendor(id, dbModel.StreamingVendorBilibili) if err != nil { return err } cli := bilibili.NewClient(vendor.Cookies) - mu, err := cli.GetVideoURL(0, bvid, uint(cid)) + var qn float64 = float64(bilibili.Q1080PP) + qnI, ok := movie.Base.VendorInfo.Info["qn"] + if ok { + qn, ok = qnI.(float64) + if !ok { + return fmt.Errorf("qn is not number") + } + } + mu, err := cli.GetVideoURL(0, bvid, uint(cid), bilibili.WithQuality(uint(qn))) if err != nil { return err } diff --git a/server/handlers/websocket.go b/server/handlers/websocket.go index 8a2a754..6b6be67 100644 --- a/server/handlers/websocket.go +++ b/server/handlers/websocket.go @@ -69,11 +69,17 @@ func handleWriterMessage(c *op.Client) error { return err } - if err := v.Encode(wc); err != nil { - log.Debugf("ws: room %s user %s encode message error: %v", c.Room().Name, c.User().Username, err) + if err = v.BeforeSend(c.User()); err != nil { + log.Debugf("ws: room %s user %s before send message error: %v", c.Room().Name, c.User().Username, err) continue } - if err := wc.Close(); err != nil { + + if err = v.Encode(wc); err != nil { + log.Debugf("ws: room %s user %s encode message error: %v", c.Room().Name, c.User().Username, err) + return err + } + + if err = wc.Close(); err != nil { return err } } diff --git a/vendors/bilibili/movie.go b/vendors/bilibili/movie.go index 93f9e9a..6cb61af 100644 --- a/vendors/bilibili/movie.go +++ b/vendors/bilibili/movie.go @@ -88,6 +88,12 @@ type GetVideoURLConf struct { type GetVideoURLConfig func(*GetVideoURLConf) +func WithQuality(q uint) GetVideoURLConfig { + return func(c *GetVideoURLConf) { + c.Quality = q + } +} + // https://github.com/SocialSisterYi/bilibili-API-collect/blob/master/docs/video/videostream_url.md func (c *Client) GetVideoURL(aid uint, bvid string, cid uint, conf ...GetVideoURLConfig) (*VideoURL, error) { config := &GetVideoURLConf{