feat: bili danmu

pull/282/head
zijiren233 10 months ago
parent 7034069983
commit 18c0bcb8aa

@ -8,6 +8,7 @@ replace github.com/synctv-org/vendors => ./vendors
require (
github.com/Boostport/mjml-go v0.15.0
github.com/andybalholm/brotli v1.1.1
github.com/caarlos0/env/v9 v9.0.0
github.com/cavaliergopher/grab/v3 v3.0.1
github.com/emersion/go-sasl v0.0.0-20241020182733-b788ff22d5a6
@ -62,7 +63,6 @@ require (
cloud.google.com/go/compute/metadata v0.6.0 // indirect
filippo.io/edwards25519 v1.1.0 // indirect
github.com/BurntSushi/toml v1.3.2 // indirect
github.com/andybalholm/brotli v1.1.1 // indirect
github.com/armon/go-metrics v0.4.1 // indirect
github.com/bytedance/sonic v1.12.6 // indirect
github.com/bytedance/sonic/loader v0.2.1 // indirect

@ -74,6 +74,8 @@ type MovieBase struct {
Type string `json:"type"`
ParentID EmptyNullString `gorm:"type:char(32)" json:"parentId"`
MoreSources []*MoreSource `gorm:"serializer:fastjson;type:text" json:"moreSources,omitempty"`
Danmu string `gorm:"type:varchar(8192)" json:"danmu"`
StreamDanmu string `gorm:"type:varchar(8192)" json:"streamDanmu"`
Live bool `json:"live"`
Proxy bool `json:"proxy"`
RtmpSource bool `json:"rtmpSource"`

@ -19,9 +19,7 @@ func LoadBilibiliClient(name string) BilibiliInterface {
return bilibiliLocalClient
}
var (
bilibiliLocalClient BilibiliInterface
)
var bilibiliLocalClient BilibiliInterface
func init() {
bilibiliLocalClient = bilibiliService.NewBilibiliService(nil)
@ -50,6 +48,10 @@ func newGrpcBilibili(client bilibili.BilibiliClient) BilibiliInterface {
}
}
func (g *grpcBilibili) GetLiveDanmuInfo(ctx context.Context, in *bilibili.GetLiveDanmuInfoReq) (*bilibili.GetLiveDanmuInfoResp, error) {
return g.client.GetLiveDanmuInfo(ctx, in)
}
func (g *grpcBilibili) NewQRCode(ctx context.Context, in *bilibili.Empty) (*bilibili.NewQRCodeResp, error) {
return g.client.NewQRCode(ctx, in)
}

@ -0,0 +1,57 @@
package handlers
import (
"context"
"net/http"
"github.com/gin-gonic/gin"
log "github.com/sirupsen/logrus"
"github.com/synctv-org/synctv/internal/op"
"github.com/synctv-org/synctv/server/handlers/vendors"
"github.com/synctv-org/synctv/server/model"
)
func StreamDanmu(ctx *gin.Context) {
log := ctx.MustGet("log").(*log.Entry)
room := ctx.MustGet("room").(*op.RoomEntry).Value()
// user := ctx.MustGet("user").(*op.UserEntry).Value()
m, err := room.GetMovieByID(ctx.Param("movieId"))
if err != nil {
log.Errorf("get movie by id error: %v", err)
ctx.AbortWithStatusJSON(http.StatusBadRequest, model.NewAPIErrorResp(err))
return
}
v, err := vendors.NewVendorService(room, m)
if err != nil {
log.Errorf("new vendor service error: %v", err)
ctx.AbortWithStatusJSON(http.StatusBadRequest, model.NewAPIErrorResp(err))
return
}
danmu, ok := v.(vendors.VendorDanmuService)
if !ok {
log.Errorf("vendor %s not support danmu", m.VendorInfo.Vendor)
ctx.AbortWithStatusJSON(http.StatusBadRequest, model.NewAPIErrorStringResp("vendor not support danmu"))
return
}
c, cancel := context.WithCancel(ctx.Request.Context())
defer cancel()
err = danmu.StreamDanmu(c, func(danmu string) error {
ctx.SSEvent("danmu", danmu)
if err := ctx.Err(); err != nil {
return err
}
ctx.Writer.Flush()
return nil
})
if err != nil {
log.Errorf("stream danmu error: %v", err)
ctx.AbortWithStatusJSON(http.StatusInternalServerError, model.NewAPIErrorResp(err))
return
}
}

@ -231,6 +231,8 @@ func initMovie(movie *gin.RouterGroup, needAuthMovie *gin.RouterGroup) {
live.GET("/hls/data/:roomId/:movieId/:dataId", ServeHlsLive)
}
needAuthMovie.GET("/danmu/:movieId", StreamDanmu)
}
func initUser(user *gin.RouterGroup, needAuthUser *gin.RouterGroup) {

@ -214,6 +214,8 @@ func (s *BilibiliVendorService) GenMovieInfo(ctx context.Context, user *op.User,
if movie.MovieBase.Live {
movie.MovieBase.URL = fmt.Sprintf("/api/room/movie/proxy/%s?token=%s&roomId=%s", movie.ID, userToken, movie.RoomID)
movie.MovieBase.Type = "m3u8"
movie.MovieBase.StreamDanmu = fmt.Sprintf("/api/room/movie/danmu/%s?token=%s&roomId=%s", movie.ID, userToken, movie.RoomID)
return movie, nil
}
@ -260,6 +262,8 @@ func (s *BilibiliVendorService) GenProxyMovieInfo(ctx context.Context, user *op.
if movie.MovieBase.Live {
movie.MovieBase.URL = fmt.Sprintf("/api/room/movie/proxy/%s?token=%s&roomId=%s", movie.ID, userToken, movie.RoomID)
movie.MovieBase.Type = "m3u8"
movie.MovieBase.StreamDanmu = fmt.Sprintf("/api/room/movie/danmu/%s?token=%s&roomId=%s", movie.ID, userToken, movie.RoomID)
return movie, nil
}

@ -214,6 +214,8 @@ func (s *BilibiliVendorService) GenMovieInfo(ctx context.Context, user *op.User,
if movie.MovieBase.Live {
movie.MovieBase.URL = fmt.Sprintf("/api/room/movie/proxy/%s?token=%s&roomId=%s", movie.ID, userToken, movie.RoomID)
movie.MovieBase.Type = "m3u8"
movie.MovieBase.StreamDanmu = fmt.Sprintf("/api/room/movie/danmu/%s?token=%s&roomId=%s", movie.ID, userToken, movie.RoomID)
return movie, nil
}
@ -260,6 +262,8 @@ func (s *BilibiliVendorService) GenProxyMovieInfo(ctx context.Context, user *op.
if movie.MovieBase.Live {
movie.MovieBase.URL = fmt.Sprintf("/api/room/movie/proxy/%s?token=%s&roomId=%s", movie.ID, userToken, movie.RoomID)
movie.MovieBase.Type = "m3u8"
movie.MovieBase.StreamDanmu = fmt.Sprintf("/api/room/movie/danmu/%s?token=%s&roomId=%s", movie.ID, userToken, movie.RoomID)
return movie, nil
}

@ -0,0 +1,242 @@
package vendorbilibili
import (
"bytes"
"compress/zlib"
"context"
"encoding/binary"
"errors"
"fmt"
"io"
"net/http"
"time"
"github.com/andybalholm/brotli"
"github.com/gorilla/websocket"
json "github.com/json-iterator/go"
log "github.com/sirupsen/logrus"
"github.com/synctv-org/synctv/internal/vendor"
"github.com/synctv-org/synctv/utils"
"github.com/synctv-org/vendors/api/bilibili"
)
type command uint32
const (
CMD_HEARTBEAT command = 2
CMD_HEARTBEAT_REPLY command = 3
CMD_NORMAL command = 5
CMD_AUTH command = 7
CMD_AUTH_REPLY command = 8
)
type header struct {
TotalSize uint32
HeaderLen uint16
Version uint16
Command command
Sequence uint32
}
var headerLen = binary.Size(header{})
func (h *header) Marshal() ([]byte, error) {
buf := bytes.NewBuffer(make([]byte, 0, headerLen))
err := binary.Write(buf, binary.BigEndian, h)
if err != nil {
return nil, err
}
return buf.Bytes(), nil
}
func (h *header) Unmarshal(data []byte) error {
return binary.Read(bytes.NewReader(data), binary.BigEndian, h)
}
func newHeader(size uint32, command command, sequence uint32) header {
h := header{
TotalSize: uint32(headerLen) + size,
HeaderLen: uint16(headerLen),
Command: command,
Sequence: sequence,
}
switch command {
case CMD_HEARTBEAT, CMD_AUTH:
h.Version = 1
}
return h
}
type verifyHello struct {
UID int64 `json:"uid"`
RoomID uint64 `json:"roomid,omitempty"`
ProtoVer int `json:"protover,omitempty"`
Platform string `json:"platform,omitempty"`
Type int `json:"type,omitempty"`
Key string `json:"key,omitempty"`
}
func newVerifyHello(roomID uint64, key string) *verifyHello {
return &verifyHello{
RoomID: roomID,
ProtoVer: 3,
Platform: "web",
Type: 2,
Key: key,
}
}
func writeVerifyHello(conn *websocket.Conn, hello *verifyHello) error {
msg, err := json.Marshal(hello)
if err != nil {
return err
}
header := newHeader(uint32(len(msg)), CMD_AUTH, 1)
headerBytes, err := header.Marshal()
if err != nil {
return err
}
return conn.WriteMessage(websocket.BinaryMessage, append(headerBytes, msg...))
}
func writeHeartbeat(conn *websocket.Conn, sequence uint32) error {
header := newHeader(0, CMD_HEARTBEAT, sequence)
headerBytes, err := header.Marshal()
if err != nil {
return err
}
return conn.WriteMessage(websocket.BinaryMessage, headerBytes)
}
type replyCmd struct {
Cmd string `json:"cmd"`
}
func (v *BilibiliVendorService) StreamDanmu(ctx context.Context, handler func(danmu string) error) error {
resp, err := vendor.LoadBilibiliClient("").GetLiveDanmuInfo(ctx, &bilibili.GetLiveDanmuInfoReq{
RoomID: v.movie.VendorInfo.Bilibili.Cid,
})
if err != nil {
return err
}
if len(resp.HostList) == 0 {
return errors.New("no host list")
}
wssHost := resp.HostList[0].Host
wssPort := resp.HostList[0].WssPort
conn, _, err := websocket.
DefaultDialer.
DialContext(
ctx,
fmt.Sprintf("wss://%s:%d/sub", wssHost, wssPort),
http.Header{
"User-Agent": []string{utils.UA},
"Origin": []string{"https://live.bilibili.com"},
},
)
if err != nil {
return err
}
defer conn.Close()
err = writeVerifyHello(
conn,
newVerifyHello(
v.movie.VendorInfo.Bilibili.Cid,
resp.Token,
),
)
if err != nil {
return err
}
_, _, err = conn.ReadMessage()
if err != nil {
return err
}
go func() {
ticker := time.NewTicker(time.Second * 20)
defer ticker.Stop()
sequence := uint32(1)
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
sequence++
err = writeHeartbeat(conn, sequence)
if err != nil {
log.Errorf("write heartbeat error: %v", err)
}
}
}
}()
for {
select {
case <-ctx.Done():
return ctx.Err()
default:
_, message, err := conn.ReadMessage()
if err != nil {
return err
}
header := header{}
err = header.Unmarshal(message[:headerLen])
if err != nil {
return err
}
switch header.Command {
case CMD_HEARTBEAT_REPLY:
continue
}
data := message[headerLen:]
switch header.Version {
case 2:
// zlib
zlibReader, err := zlib.NewReader(bytes.NewReader(data))
if err != nil {
return err
}
defer zlibReader.Close()
data, err = io.ReadAll(zlibReader)
if err != nil {
return err
}
case 3:
// brotli
brotliReader := brotli.NewReader(bytes.NewReader(data))
data, err = io.ReadAll(brotliReader)
if err != nil {
return err
}
data = data[headerLen:]
}
reply := replyCmd{}
err = json.Unmarshal(data, &reply)
if err != nil {
return err
}
switch reply.Cmd {
case "DANMU_MSG":
danmu := danmuMsg{}
err = json.Unmarshal(data, &danmu)
if err != nil {
return err
}
content, ok := danmu.Info[1].(string)
if !ok {
return errors.New("content is not string")
}
handler(content)
case "DM_INTERACTION":
}
}
}
}
type danmuMsg struct {
Info []any `json:"info"`
}

@ -38,6 +38,10 @@ type VendorService interface {
GenMovieInfo(ctx context.Context, reqUser *op.User, userAgent, userToken string) (*dbModel.Movie, error)
}
type VendorDanmuService interface {
StreamDanmu(ctx context.Context, handler func(danmu string) error) error
}
func NewVendorService(room *op.Room, movie *op.Movie) (VendorService, error) {
switch movie.VendorInfo.Vendor {
case dbModel.VendorBilibili:

@ -1 +1 @@
Subproject commit f737630257f56b6c703853a36d626786fe2a6bd9
Subproject commit 06afd9cf4a261f8395d09f7859db9ec40abc67c7

@ -1 +1 @@
Subproject commit e31a7149e750fc145c2559b7023082dd16718726
Subproject commit 54947d824dd71a2bc3aff1ba572470cd31261dec
Loading…
Cancel
Save