diff --git a/room/client.go b/room/client.go index 10b31c6..0a2b5b5 100644 --- a/room/client.go +++ b/room/client.go @@ -16,7 +16,7 @@ type Client struct { wg sync.WaitGroup conn *websocket.Conn timeOut time.Duration - closed uint64 + closed uint32 } func NewClient(user *User, conn *websocket.Conn) (*Client, error) { @@ -65,7 +65,7 @@ func (c *Client) Unregister() error { } func (c *Client) Close() error { - if !atomic.CompareAndSwapUint64(&c.closed, 0, 1) { + if !atomic.CompareAndSwapUint32(&c.closed, 0, 1) { return ErrAlreadyClosed } c.wg.Wait() @@ -74,7 +74,7 @@ func (c *Client) Close() error { } func (c *Client) Closed() bool { - return atomic.LoadUint64(&c.closed) == 1 + return atomic.LoadUint32(&c.closed) == 1 } func (c *Client) GetReadChan() <-chan Message { diff --git a/room/current.go b/room/current.go index 0ff345b..a13a598 100644 --- a/room/current.go +++ b/room/current.go @@ -10,14 +10,13 @@ import ( type Current struct { movie MovieInfo status Status - lock *sync.RWMutex + lock sync.RWMutex } func newCurrent() *Current { return &Current{ movie: MovieInfo{}, status: newStatus(), - lock: new(sync.RWMutex), } } diff --git a/room/hub.go b/room/hub.go index 62b78de..eb27226 100644 --- a/room/hub.go +++ b/room/hub.go @@ -15,8 +15,7 @@ import ( type hub struct { id string - clientNum int64 - clients *rwmap.RWMap[string, *Client] + clients rwmap.RWMap[string, *Client] broadcast chan *broadcastMessage exit chan struct{} closed uint32 @@ -54,7 +53,7 @@ func newHub(id string) *hub { return &hub{ id: id, broadcast: make(chan *broadcastMessage, 128), - clients: &rwmap.RWMap[string, *Client]{}, + clients: rwmap.RWMap[string, *Client]{}, exit: make(chan struct{}), } } @@ -156,7 +155,6 @@ func (h *hub) RegClient(user *User, conn *websocket.Conn) (*Client, error) { if loaded { return nil, errors.New("client already registered") } - atomic.AddInt64(&h.clientNum, 1) return c, nil } @@ -171,10 +169,9 @@ func (h *hub) UnRegClient(user *User) error { if !loaded { return errors.New("client not found") } - atomic.AddInt64(&h.clientNum, -1) return nil } func (h *hub) ClientNum() int64 { - return atomic.LoadInt64(&h.clientNum) + return h.clients.Len() } diff --git a/room/movies.go b/room/movies.go index f0c23ba..19884f5 100644 --- a/room/movies.go +++ b/room/movies.go @@ -24,7 +24,7 @@ func (e FormatErrMovieAlreadyExist) Error() string { type movies struct { l *dllist.Dllist[*Movie] - lock *sync.RWMutex + lock sync.RWMutex } // Url will be `PullKey` when Live and Proxy are true @@ -128,7 +128,7 @@ func (m *Movie) SetChannel(channel *rtmps.Channel) { } func newMovies() *movies { - return &movies{l: dllist.New[*Movie](), lock: &sync.RWMutex{}} + return &movies{l: dllist.New[*Movie]()} } func (m *movies) Range(f func(e *dllist.Element[*Movie]) bool) (interrupt bool) { diff --git a/room/room.go b/room/room.go index 4c95fb9..0130496 100644 --- a/room/room.go +++ b/room/room.go @@ -20,25 +20,22 @@ var ( ) type Room struct { - id string - lock *sync.RWMutex - password []byte - needPassword bool - version uint64 - current *Current - maxInactivityTime time.Duration - lastActive time.Time - rtmps *rtmps.Server - rtmpa *rtmps.App - hidden bool - timer *time.Timer - inited bool - users *rwmap.RWMap[string, *User] - rootUser *User - createAt time.Time - mid uint64 + id string + password []byte + needPassword uint32 + version uint64 + current *Current + rtmps *rtmps.Server + rtmpa *rtmps.App + hidden uint32 + initOnce sync.Once + users rwmap.RWMap[string, *User] + rootUser *User + lastActive int64 + createAt int64 + mid uint64 + hub *hub *movies - *hub } type RoomConf func(r *Room) @@ -49,15 +46,9 @@ func WithVersion(version uint64) RoomConf { } } -func WithMaxInactivityTime(maxInactivityTime time.Duration) RoomConf { - return func(r *Room) { - r.maxInactivityTime = maxInactivityTime - } -} - func WithHidden(hidden bool) RoomConf { return func(r *Room) { - r.hidden = hidden + r.SetHidden(hidden) } } @@ -75,18 +66,12 @@ func NewRoom(RoomID string, Password string, rtmps *rtmps.Server, conf ...RoomCo if RoomID == "" { return nil, ErrRoomIDEmpty } - + now := time.Now().UnixMilli() r := &Room{ - id: RoomID, - lock: new(sync.RWMutex), - movies: newMovies(), - current: newCurrent(), - maxInactivityTime: 12 * time.Hour, - lastActive: time.Now(), - hub: newHub(RoomID), - rtmps: rtmps, - users: &rwmap.RWMap[string, *User]{}, - createAt: time.Now(), + id: RoomID, + rtmps: rtmps, + lastActive: now, + createAt: now, } for _, c := range conf { @@ -94,14 +79,23 @@ func NewRoom(RoomID string, Password string, rtmps *rtmps.Server, conf ...RoomCo } if r.version == 0 { - r.version = rand.New(rand.NewSource(time.Now().UnixNano())).Uint64() + r.version = rand.New(rand.NewSource(now)).Uint64() } return r, r.SetPassword(Password) } -func (r *Room) CreateAt() time.Time { - return r.createAt +func (r *Room) Init() { + r.initOnce.Do(func() { + r.rtmpa = r.rtmps.GetOrNewApp(r.id) + r.hub = newHub(r.id) + r.movies = newMovies() + r.current = newCurrent() + }) +} + +func (r *Room) CreateAt() int64 { + return atomic.LoadInt64(&r.createAt) } func (r *Room) RootUser() *User { @@ -183,28 +177,10 @@ func (r *Room) Start() { } func (r *Room) Serve() { - r.init() r.hub.Serve() } -func (r *Room) init() { - r.lock.Lock() - defer r.lock.Unlock() - if r.inited { - return - } - r.inited = true - if r.maxInactivityTime != 0 { - r.timer = time.AfterFunc(time.Duration(r.maxInactivityTime), func() { - r.Close() - }) - } - r.rtmpa = r.rtmps.GetOrNewApp(r.id) -} - func (r *Room) Close() error { - r.lock.Lock() - defer r.lock.Unlock() if err := r.hub.Close(); err != nil { return err } @@ -212,22 +188,19 @@ func (r *Room) Close() error { if err != nil { return err } - if r.timer != nil { - r.timer.Stop() - } return nil } func (r *Room) SetHidden(hidden bool) { - r.lock.Lock() - defer r.lock.Unlock() - r.hidden = hidden + if hidden { + atomic.StoreUint32(&r.hidden, 1) + } else { + atomic.StoreUint32(&r.hidden, 0) + } } func (r *Room) Hidden() bool { - r.lock.RLock() - defer r.lock.RUnlock() - return r.hidden + return atomic.LoadUint32(&r.hidden) == 1 } func (r *Room) ID() string { @@ -235,45 +208,34 @@ func (r *Room) ID() string { } func (r *Room) UpdateActiveTime() { - r.lock.Lock() - defer r.lock.Unlock() - r.updateActiveTime() -} - -func (r *Room) updateActiveTime() { - if r.maxInactivityTime != 0 { - r.timer.Reset(r.maxInactivityTime) - } - r.lastActive = time.Now() + atomic.StoreInt64(&r.lastActive, time.Now().UnixMilli()) } -func (r *Room) ResetMaxInactivityTime(maxInactivityTime time.Duration) { - r.lock.Lock() - defer r.lock.Unlock() - r.maxInactivityTime = maxInactivityTime - r.updateActiveTime() -} - -func (r *Room) LateActiveTime() time.Time { - r.lock.RLock() - defer r.lock.RUnlock() - return r.lastActive +func (r *Room) LateActiveTime() int64 { + return atomic.LoadInt64(&r.lastActive) } func (r *Room) SetPassword(password string) error { - r.lock.Lock() - defer r.lock.Unlock() if password != "" { b, err := bcrypt.GenerateFromPassword(stream.StringToBytes(password), bcrypt.DefaultCost) if err != nil { return err } r.password = b - r.needPassword = true + atomic.StoreUint32(&r.needPassword, 1) } else { - r.needPassword = false + atomic.StoreUint32(&r.needPassword, 0) + r.password = nil } r.updateVersion() + return nil +} + +func (r *Room) SetPasswordAndCloseAll(password string) error { + err := r.SetPassword(password) + if err != nil { + return err + } r.hub.clients.Range(func(_ string, value *Client) bool { value.Close() return true @@ -282,18 +244,14 @@ func (r *Room) SetPassword(password string) error { } func (r *Room) CheckPassword(password string) (ok bool) { - r.lock.RLock() - defer r.lock.RUnlock() - if !r.needPassword { + if !r.NeedPassword() { return true } return bcrypt.CompareHashAndPassword(r.password, stream.StringToBytes(password)) == nil } func (r *Room) NeedPassword() bool { - r.lock.RLock() - defer r.lock.RUnlock() - return r.needPassword + return atomic.LoadUint32(&r.needPassword) == 1 } func (r *Room) Version() uint64 { @@ -318,6 +276,7 @@ func (r *Room) Current() *Current { // Seek will be set to 0 func (r *Room) ChangeCurrentMovie(id uint64) error { + r.UpdateActiveTime() e, err := r.movies.getMovie(id) if err != nil { return err @@ -353,22 +312,19 @@ func (r *Room) PushBackMovie(movie *Movie) error { if r.hub.Closed() { return ErrAlreadyClosed } + r.UpdateActiveTime() return r.movies.PushBackMovie(movie) } func (r *Room) PushFrontMovie(movie *Movie) error { - if r.hub.Closed() { - return ErrAlreadyClosed - } + r.UpdateActiveTime() return r.movies.PushFrontMovie(movie) } func (r *Room) DelMovie(id ...uint64) error { - if r.hub.Closed() { - return ErrAlreadyClosed - } + r.UpdateActiveTime() m, err := r.movies.GetAndDelMovie(id...) if err != nil { return err @@ -377,15 +333,13 @@ func (r *Room) DelMovie(id ...uint64) error { } func (r *Room) ClearMovies() (err error) { - if r.hub.Closed() { - return ErrAlreadyClosed - } + r.UpdateActiveTime() return r.closeLive(r.movies.GetAndClear()) } func (r *Room) closeLive(m []*Movie) error { for _, m := range m { - if m.Live { + if m.RtmpSource || (m.Proxy && m.Live) { if err := r.rtmpa.DelChannel(m.PullKey); err != nil { return err } @@ -395,9 +349,7 @@ func (r *Room) closeLive(m []*Movie) error { } func (r *Room) SwapMovie(id1, id2 uint64) error { - if r.hub.Closed() { - return ErrAlreadyClosed - } + r.UpdateActiveTime() return r.movies.SwapMovie(id1, id2) } @@ -407,6 +359,20 @@ func (r *Room) Broadcast(msg Message, conf ...BroadcastConf) error { } func (r *Room) RegClient(user *User, conn *websocket.Conn) (*Client, error) { - r.updateActiveTime() + r.UpdateActiveTime() return r.hub.RegClient(user, conn) } + +func (r *Room) UnRegClient(user *User) error { + r.UpdateActiveTime() + return r.hub.UnRegClient(user) +} + +func (r *Room) Closed() bool { + return r.hub.Closed() +} + +func (r *Room) ClientNum() int64 { + r.UpdateActiveTime() + return r.hub.ClientNum() +} diff --git a/room/user.go b/room/user.go index 240dc72..c1a9d87 100644 --- a/room/user.go +++ b/room/user.go @@ -149,8 +149,7 @@ func (u *User) NewMovie(url string, name string, type_ string, live bool, proxy } func (u *User) NewMovieWithBaseMovie(baseMovie BaseMovie, conf ...MovieConf) (*Movie, error) { - conf = append(conf, WithCreator(u)) - return NewMovieWithBaseMovie(atomic.AddUint64(&u.room.mid, 1), baseMovie, conf...) + return NewMovieWithBaseMovie(atomic.AddUint64(&u.room.mid, 1), baseMovie, append(conf, WithCreator(u))...) } func (u *User) Movie(id uint64) (*MovieInfo, error) { diff --git a/server/handlers/api-room.go b/server/handlers/api-room.go index 3c0eff0..31bdbb1 100644 --- a/server/handlers/api-room.go +++ b/server/handlers/api-room.go @@ -145,7 +145,6 @@ func NewCreateRoomHandler(s *rtmps.Server) gin.HandlerFunc { r, err := Rooms.CreateRoom(req.RoomID, req.Password, s, room.WithHidden(req.Hidden), - room.WithMaxInactivityTime(roomMaxInactivityTime), room.WithRootUser(user), ) if err != nil { @@ -159,6 +158,9 @@ func NewCreateRoomHandler(s *rtmps.Server) gin.HandlerFunc { return } + r.Init() + r.Start() + go func() { ticker := time.NewTicker(time.Second * 5) defer ticker.Stop() @@ -187,10 +189,6 @@ func NewCreateRoomHandler(s *rtmps.Server) gin.HandlerFunc { } }() - r.Start() - - r.SetRootUser(user) - ctx.JSON(http.StatusCreated, NewApiDataResp(gin.H{ "token": token, })) @@ -218,7 +216,7 @@ func RoomList(ctx *gin.Context) { PeopleNum: v.ClientNum(), NeedPassword: v.NeedPassword(), Creator: v.RootUser().Name(), - CreateAt: v.CreateAt().UnixMilli(), + CreateAt: v.CreateAt(), }) } @@ -364,8 +362,14 @@ func DeleteRoom(ctx *gin.Context) { return } - if err := Rooms.DelRoom(user.Room().ID()); err != nil { - ctx.AbortWithStatusJSON(http.StatusBadRequest, NewApiErrorResp(err)) + if !user.IsRoot() { + ctx.AbortWithStatusJSON(http.StatusUnauthorized, NewApiErrorStringResp("only root can close room")) + return + } + + err = Rooms.DelRoom(user.Room().ID()) + if err != nil { + ctx.AbortWithStatusJSON(http.StatusInternalServerError, NewApiErrorResp(err)) return } diff --git a/server/handlers/rooms.go b/server/handlers/rooms.go index cf68ce6..a6825e7 100644 --- a/server/handlers/rooms.go +++ b/server/handlers/rooms.go @@ -7,7 +7,6 @@ import ( "github.com/synctv-org/synctv/room" "github.com/zijiren233/gencontainer/rwmap" - "github.com/zijiren233/ksync" rtmps "github.com/zijiren233/livelib/server" ) @@ -33,26 +32,16 @@ var ( ) type rooms struct { - rooms *rwmap.RWMap[string, *room.Room] - lock *ksync.Krwmutex + rooms rwmap.RWMap[string, *room.Room] } func newRooms() *rooms { - return &rooms{ - rooms: &rwmap.RWMap[string, *room.Room]{}, - lock: ksync.NewKrwmutex(), - } + return &rooms{} } func (rs *rooms) List() (rooms []*room.Room) { rooms = make([]*room.Room, 0, rs.rooms.Len()) rs.rooms.Range(func(id string, r *room.Room) bool { - rs.lock.RLock(id) - defer rs.lock.RUnlock(id) - if r.Closed() { - rs.rooms.Delete(id) - return true - } rooms = append(rooms, r) return true }) @@ -62,12 +51,6 @@ func (rs *rooms) List() (rooms []*room.Room) { func (rs *rooms) ListNonHidden() (rooms []*room.Room) { rooms = make([]*room.Room, 0, rs.rooms.Len()) rs.rooms.Range(func(id string, r *room.Room) bool { - rs.lock.RLock(id) - defer rs.lock.RUnlock(id) - if r.Closed() { - rs.rooms.Delete(id) - return true - } if !r.Hidden() { rooms = append(rooms, r) } @@ -79,12 +62,6 @@ func (rs *rooms) ListNonHidden() (rooms []*room.Room) { func (rs *rooms) ListHidden() (rooms []*room.Room) { rooms = make([]*room.Room, 0, rs.rooms.Len()) rs.rooms.Range(func(id string, r *room.Room) bool { - rs.lock.RLock(id) - defer rs.lock.RUnlock(id) - if r.Closed() { - rs.rooms.Delete(id) - return true - } if r.Hidden() { rooms = append(rooms, r) } @@ -94,12 +71,7 @@ func (rs *rooms) ListHidden() (rooms []*room.Room) { } func (rs *rooms) HasRoom(id string) bool { - rs.lock.Lock(id) - defer rs.lock.Unlock(id) - r, ok := rs.rooms.Load(id) - if !ok || r.Closed() { - return false - } + _, ok := rs.rooms.Load(id) return ok } @@ -107,30 +79,22 @@ func (rs *rooms) GetRoom(id string) (*room.Room, error) { if id == "" { return nil, ErrRoomIDEmpty } - rs.lock.RLock(id) - defer rs.lock.RUnlock(id) r, ok := rs.rooms.Load(id) - if !ok || r.Closed() { + if !ok { return nil, ErrRoomNotFound } return r, nil } func (rs *rooms) CreateRoom(id string, password string, s *rtmps.Server, conf ...room.RoomConf) (*room.Room, error) { - if id == "" { - return nil, ErrRoomIDEmpty - } - rs.lock.Lock(id) - defer rs.lock.Unlock(id) - - if oldR, ok := rs.rooms.Load(id); ok && !oldR.Closed() { - return nil, ErrRoomAlreadyExist - } r, err := room.NewRoom(id, password, s, conf...) if err != nil { return nil, err } - rs.rooms.Store(id, r) + r, loaded := rs.rooms.LoadOrStore(r.ID(), r) + if loaded { + return nil, ErrRoomAlreadyExist + } return r, nil } @@ -138,8 +102,6 @@ func (rs *rooms) DelRoom(id string) error { if id == "" { return ErrRoomIDEmpty } - rs.lock.Lock(id) - defer rs.lock.Unlock(id) r, ok := rs.rooms.LoadAndDelete(id) if !ok { return ErrRoomNotFound diff --git a/server/middlewares/init.go b/server/middlewares/init.go index f2f3f98..85fe109 100644 --- a/server/middlewares/init.go +++ b/server/middlewares/init.go @@ -7,8 +7,9 @@ import ( ) func Init(e *gin.Engine) { + w := log.StandardLogger().Writer() e. - Use(gin.LoggerWithWriter(log.StandardLogger().Out), gin.RecoveryWithWriter(log.StandardLogger().Out)). + Use(gin.LoggerWithWriter(w), gin.RecoveryWithWriter(w)). Use(NewCors()) if conf.Conf.Server.Quic { e.Use(NewQuic())