Feat: no race and no more locker

pull/21/head
zijiren233 2 years ago
parent 0f34b80066
commit 15663582b0

@ -16,7 +16,7 @@ type Client struct {
wg sync.WaitGroup wg sync.WaitGroup
conn *websocket.Conn conn *websocket.Conn
timeOut time.Duration timeOut time.Duration
closed uint64 closed uint32
} }
func NewClient(user *User, conn *websocket.Conn) (*Client, error) { func NewClient(user *User, conn *websocket.Conn) (*Client, error) {
@ -65,7 +65,7 @@ func (c *Client) Unregister() error {
} }
func (c *Client) Close() error { func (c *Client) Close() error {
if !atomic.CompareAndSwapUint64(&c.closed, 0, 1) { if !atomic.CompareAndSwapUint32(&c.closed, 0, 1) {
return ErrAlreadyClosed return ErrAlreadyClosed
} }
c.wg.Wait() c.wg.Wait()
@ -74,7 +74,7 @@ func (c *Client) Close() error {
} }
func (c *Client) Closed() bool { func (c *Client) Closed() bool {
return atomic.LoadUint64(&c.closed) == 1 return atomic.LoadUint32(&c.closed) == 1
} }
func (c *Client) GetReadChan() <-chan Message { func (c *Client) GetReadChan() <-chan Message {

@ -10,14 +10,13 @@ import (
type Current struct { type Current struct {
movie MovieInfo movie MovieInfo
status Status status Status
lock *sync.RWMutex lock sync.RWMutex
} }
func newCurrent() *Current { func newCurrent() *Current {
return &Current{ return &Current{
movie: MovieInfo{}, movie: MovieInfo{},
status: newStatus(), status: newStatus(),
lock: new(sync.RWMutex),
} }
} }

@ -15,8 +15,7 @@ import (
type hub struct { type hub struct {
id string id string
clientNum int64 clients rwmap.RWMap[string, *Client]
clients *rwmap.RWMap[string, *Client]
broadcast chan *broadcastMessage broadcast chan *broadcastMessage
exit chan struct{} exit chan struct{}
closed uint32 closed uint32
@ -54,7 +53,7 @@ func newHub(id string) *hub {
return &hub{ return &hub{
id: id, id: id,
broadcast: make(chan *broadcastMessage, 128), broadcast: make(chan *broadcastMessage, 128),
clients: &rwmap.RWMap[string, *Client]{}, clients: rwmap.RWMap[string, *Client]{},
exit: make(chan struct{}), exit: make(chan struct{}),
} }
} }
@ -156,7 +155,6 @@ func (h *hub) RegClient(user *User, conn *websocket.Conn) (*Client, error) {
if loaded { if loaded {
return nil, errors.New("client already registered") return nil, errors.New("client already registered")
} }
atomic.AddInt64(&h.clientNum, 1)
return c, nil return c, nil
} }
@ -171,10 +169,9 @@ func (h *hub) UnRegClient(user *User) error {
if !loaded { if !loaded {
return errors.New("client not found") return errors.New("client not found")
} }
atomic.AddInt64(&h.clientNum, -1)
return nil return nil
} }
func (h *hub) ClientNum() int64 { func (h *hub) ClientNum() int64 {
return atomic.LoadInt64(&h.clientNum) return h.clients.Len()
} }

@ -24,7 +24,7 @@ func (e FormatErrMovieAlreadyExist) Error() string {
type movies struct { type movies struct {
l *dllist.Dllist[*Movie] l *dllist.Dllist[*Movie]
lock *sync.RWMutex lock sync.RWMutex
} }
// Url will be `PullKey` when Live and Proxy are true // Url will be `PullKey` when Live and Proxy are true
@ -128,7 +128,7 @@ func (m *Movie) SetChannel(channel *rtmps.Channel) {
} }
func newMovies() *movies { func newMovies() *movies {
return &movies{l: dllist.New[*Movie](), lock: &sync.RWMutex{}} return &movies{l: dllist.New[*Movie]()}
} }
func (m *movies) Range(f func(e *dllist.Element[*Movie]) bool) (interrupt bool) { func (m *movies) Range(f func(e *dllist.Element[*Movie]) bool) (interrupt bool) {

@ -20,25 +20,22 @@ var (
) )
type Room struct { type Room struct {
id string id string
lock *sync.RWMutex password []byte
password []byte needPassword uint32
needPassword bool version uint64
version uint64 current *Current
current *Current rtmps *rtmps.Server
maxInactivityTime time.Duration rtmpa *rtmps.App
lastActive time.Time hidden uint32
rtmps *rtmps.Server initOnce sync.Once
rtmpa *rtmps.App users rwmap.RWMap[string, *User]
hidden bool rootUser *User
timer *time.Timer lastActive int64
inited bool createAt int64
users *rwmap.RWMap[string, *User] mid uint64
rootUser *User hub *hub
createAt time.Time
mid uint64
*movies *movies
*hub
} }
type RoomConf func(r *Room) type RoomConf func(r *Room)
@ -49,15 +46,9 @@ func WithVersion(version uint64) RoomConf {
} }
} }
func WithMaxInactivityTime(maxInactivityTime time.Duration) RoomConf {
return func(r *Room) {
r.maxInactivityTime = maxInactivityTime
}
}
func WithHidden(hidden bool) RoomConf { func WithHidden(hidden bool) RoomConf {
return func(r *Room) { return func(r *Room) {
r.hidden = hidden r.SetHidden(hidden)
} }
} }
@ -75,18 +66,12 @@ func NewRoom(RoomID string, Password string, rtmps *rtmps.Server, conf ...RoomCo
if RoomID == "" { if RoomID == "" {
return nil, ErrRoomIDEmpty return nil, ErrRoomIDEmpty
} }
now := time.Now().UnixMilli()
r := &Room{ r := &Room{
id: RoomID, id: RoomID,
lock: new(sync.RWMutex), rtmps: rtmps,
movies: newMovies(), lastActive: now,
current: newCurrent(), createAt: now,
maxInactivityTime: 12 * time.Hour,
lastActive: time.Now(),
hub: newHub(RoomID),
rtmps: rtmps,
users: &rwmap.RWMap[string, *User]{},
createAt: time.Now(),
} }
for _, c := range conf { for _, c := range conf {
@ -94,14 +79,23 @@ func NewRoom(RoomID string, Password string, rtmps *rtmps.Server, conf ...RoomCo
} }
if r.version == 0 { if r.version == 0 {
r.version = rand.New(rand.NewSource(time.Now().UnixNano())).Uint64() r.version = rand.New(rand.NewSource(now)).Uint64()
} }
return r, r.SetPassword(Password) return r, r.SetPassword(Password)
} }
func (r *Room) CreateAt() time.Time { func (r *Room) Init() {
return r.createAt r.initOnce.Do(func() {
r.rtmpa = r.rtmps.GetOrNewApp(r.id)
r.hub = newHub(r.id)
r.movies = newMovies()
r.current = newCurrent()
})
}
func (r *Room) CreateAt() int64 {
return atomic.LoadInt64(&r.createAt)
} }
func (r *Room) RootUser() *User { func (r *Room) RootUser() *User {
@ -183,28 +177,10 @@ func (r *Room) Start() {
} }
func (r *Room) Serve() { func (r *Room) Serve() {
r.init()
r.hub.Serve() r.hub.Serve()
} }
func (r *Room) init() {
r.lock.Lock()
defer r.lock.Unlock()
if r.inited {
return
}
r.inited = true
if r.maxInactivityTime != 0 {
r.timer = time.AfterFunc(time.Duration(r.maxInactivityTime), func() {
r.Close()
})
}
r.rtmpa = r.rtmps.GetOrNewApp(r.id)
}
func (r *Room) Close() error { func (r *Room) Close() error {
r.lock.Lock()
defer r.lock.Unlock()
if err := r.hub.Close(); err != nil { if err := r.hub.Close(); err != nil {
return err return err
} }
@ -212,22 +188,19 @@ func (r *Room) Close() error {
if err != nil { if err != nil {
return err return err
} }
if r.timer != nil {
r.timer.Stop()
}
return nil return nil
} }
func (r *Room) SetHidden(hidden bool) { func (r *Room) SetHidden(hidden bool) {
r.lock.Lock() if hidden {
defer r.lock.Unlock() atomic.StoreUint32(&r.hidden, 1)
r.hidden = hidden } else {
atomic.StoreUint32(&r.hidden, 0)
}
} }
func (r *Room) Hidden() bool { func (r *Room) Hidden() bool {
r.lock.RLock() return atomic.LoadUint32(&r.hidden) == 1
defer r.lock.RUnlock()
return r.hidden
} }
func (r *Room) ID() string { func (r *Room) ID() string {
@ -235,45 +208,34 @@ func (r *Room) ID() string {
} }
func (r *Room) UpdateActiveTime() { func (r *Room) UpdateActiveTime() {
r.lock.Lock() atomic.StoreInt64(&r.lastActive, time.Now().UnixMilli())
defer r.lock.Unlock()
r.updateActiveTime()
}
func (r *Room) updateActiveTime() {
if r.maxInactivityTime != 0 {
r.timer.Reset(r.maxInactivityTime)
}
r.lastActive = time.Now()
} }
func (r *Room) ResetMaxInactivityTime(maxInactivityTime time.Duration) { func (r *Room) LateActiveTime() int64 {
r.lock.Lock() return atomic.LoadInt64(&r.lastActive)
defer r.lock.Unlock()
r.maxInactivityTime = maxInactivityTime
r.updateActiveTime()
}
func (r *Room) LateActiveTime() time.Time {
r.lock.RLock()
defer r.lock.RUnlock()
return r.lastActive
} }
func (r *Room) SetPassword(password string) error { func (r *Room) SetPassword(password string) error {
r.lock.Lock()
defer r.lock.Unlock()
if password != "" { if password != "" {
b, err := bcrypt.GenerateFromPassword(stream.StringToBytes(password), bcrypt.DefaultCost) b, err := bcrypt.GenerateFromPassword(stream.StringToBytes(password), bcrypt.DefaultCost)
if err != nil { if err != nil {
return err return err
} }
r.password = b r.password = b
r.needPassword = true atomic.StoreUint32(&r.needPassword, 1)
} else { } else {
r.needPassword = false atomic.StoreUint32(&r.needPassword, 0)
r.password = nil
} }
r.updateVersion() r.updateVersion()
return nil
}
func (r *Room) SetPasswordAndCloseAll(password string) error {
err := r.SetPassword(password)
if err != nil {
return err
}
r.hub.clients.Range(func(_ string, value *Client) bool { r.hub.clients.Range(func(_ string, value *Client) bool {
value.Close() value.Close()
return true return true
@ -282,18 +244,14 @@ func (r *Room) SetPassword(password string) error {
} }
func (r *Room) CheckPassword(password string) (ok bool) { func (r *Room) CheckPassword(password string) (ok bool) {
r.lock.RLock() if !r.NeedPassword() {
defer r.lock.RUnlock()
if !r.needPassword {
return true return true
} }
return bcrypt.CompareHashAndPassword(r.password, stream.StringToBytes(password)) == nil return bcrypt.CompareHashAndPassword(r.password, stream.StringToBytes(password)) == nil
} }
func (r *Room) NeedPassword() bool { func (r *Room) NeedPassword() bool {
r.lock.RLock() return atomic.LoadUint32(&r.needPassword) == 1
defer r.lock.RUnlock()
return r.needPassword
} }
func (r *Room) Version() uint64 { func (r *Room) Version() uint64 {
@ -318,6 +276,7 @@ func (r *Room) Current() *Current {
// Seek will be set to 0 // Seek will be set to 0
func (r *Room) ChangeCurrentMovie(id uint64) error { func (r *Room) ChangeCurrentMovie(id uint64) error {
r.UpdateActiveTime()
e, err := r.movies.getMovie(id) e, err := r.movies.getMovie(id)
if err != nil { if err != nil {
return err return err
@ -353,22 +312,19 @@ func (r *Room) PushBackMovie(movie *Movie) error {
if r.hub.Closed() { if r.hub.Closed() {
return ErrAlreadyClosed return ErrAlreadyClosed
} }
r.UpdateActiveTime()
return r.movies.PushBackMovie(movie) return r.movies.PushBackMovie(movie)
} }
func (r *Room) PushFrontMovie(movie *Movie) error { func (r *Room) PushFrontMovie(movie *Movie) error {
if r.hub.Closed() { r.UpdateActiveTime()
return ErrAlreadyClosed
}
return r.movies.PushFrontMovie(movie) return r.movies.PushFrontMovie(movie)
} }
func (r *Room) DelMovie(id ...uint64) error { func (r *Room) DelMovie(id ...uint64) error {
if r.hub.Closed() { r.UpdateActiveTime()
return ErrAlreadyClosed
}
m, err := r.movies.GetAndDelMovie(id...) m, err := r.movies.GetAndDelMovie(id...)
if err != nil { if err != nil {
return err return err
@ -377,15 +333,13 @@ func (r *Room) DelMovie(id ...uint64) error {
} }
func (r *Room) ClearMovies() (err error) { func (r *Room) ClearMovies() (err error) {
if r.hub.Closed() { r.UpdateActiveTime()
return ErrAlreadyClosed
}
return r.closeLive(r.movies.GetAndClear()) return r.closeLive(r.movies.GetAndClear())
} }
func (r *Room) closeLive(m []*Movie) error { func (r *Room) closeLive(m []*Movie) error {
for _, m := range m { for _, m := range m {
if m.Live { if m.RtmpSource || (m.Proxy && m.Live) {
if err := r.rtmpa.DelChannel(m.PullKey); err != nil { if err := r.rtmpa.DelChannel(m.PullKey); err != nil {
return err return err
} }
@ -395,9 +349,7 @@ func (r *Room) closeLive(m []*Movie) error {
} }
func (r *Room) SwapMovie(id1, id2 uint64) error { func (r *Room) SwapMovie(id1, id2 uint64) error {
if r.hub.Closed() { r.UpdateActiveTime()
return ErrAlreadyClosed
}
return r.movies.SwapMovie(id1, id2) return r.movies.SwapMovie(id1, id2)
} }
@ -407,6 +359,20 @@ func (r *Room) Broadcast(msg Message, conf ...BroadcastConf) error {
} }
func (r *Room) RegClient(user *User, conn *websocket.Conn) (*Client, error) { func (r *Room) RegClient(user *User, conn *websocket.Conn) (*Client, error) {
r.updateActiveTime() r.UpdateActiveTime()
return r.hub.RegClient(user, conn) return r.hub.RegClient(user, conn)
} }
func (r *Room) UnRegClient(user *User) error {
r.UpdateActiveTime()
return r.hub.UnRegClient(user)
}
func (r *Room) Closed() bool {
return r.hub.Closed()
}
func (r *Room) ClientNum() int64 {
r.UpdateActiveTime()
return r.hub.ClientNum()
}

@ -149,8 +149,7 @@ func (u *User) NewMovie(url string, name string, type_ string, live bool, proxy
} }
func (u *User) NewMovieWithBaseMovie(baseMovie BaseMovie, conf ...MovieConf) (*Movie, error) { func (u *User) NewMovieWithBaseMovie(baseMovie BaseMovie, conf ...MovieConf) (*Movie, error) {
conf = append(conf, WithCreator(u)) return NewMovieWithBaseMovie(atomic.AddUint64(&u.room.mid, 1), baseMovie, append(conf, WithCreator(u))...)
return NewMovieWithBaseMovie(atomic.AddUint64(&u.room.mid, 1), baseMovie, conf...)
} }
func (u *User) Movie(id uint64) (*MovieInfo, error) { func (u *User) Movie(id uint64) (*MovieInfo, error) {

@ -145,7 +145,6 @@ func NewCreateRoomHandler(s *rtmps.Server) gin.HandlerFunc {
r, err := Rooms.CreateRoom(req.RoomID, req.Password, s, r, err := Rooms.CreateRoom(req.RoomID, req.Password, s,
room.WithHidden(req.Hidden), room.WithHidden(req.Hidden),
room.WithMaxInactivityTime(roomMaxInactivityTime),
room.WithRootUser(user), room.WithRootUser(user),
) )
if err != nil { if err != nil {
@ -159,6 +158,9 @@ func NewCreateRoomHandler(s *rtmps.Server) gin.HandlerFunc {
return return
} }
r.Init()
r.Start()
go func() { go func() {
ticker := time.NewTicker(time.Second * 5) ticker := time.NewTicker(time.Second * 5)
defer ticker.Stop() defer ticker.Stop()
@ -187,10 +189,6 @@ func NewCreateRoomHandler(s *rtmps.Server) gin.HandlerFunc {
} }
}() }()
r.Start()
r.SetRootUser(user)
ctx.JSON(http.StatusCreated, NewApiDataResp(gin.H{ ctx.JSON(http.StatusCreated, NewApiDataResp(gin.H{
"token": token, "token": token,
})) }))
@ -218,7 +216,7 @@ func RoomList(ctx *gin.Context) {
PeopleNum: v.ClientNum(), PeopleNum: v.ClientNum(),
NeedPassword: v.NeedPassword(), NeedPassword: v.NeedPassword(),
Creator: v.RootUser().Name(), Creator: v.RootUser().Name(),
CreateAt: v.CreateAt().UnixMilli(), CreateAt: v.CreateAt(),
}) })
} }
@ -364,8 +362,14 @@ func DeleteRoom(ctx *gin.Context) {
return return
} }
if err := Rooms.DelRoom(user.Room().ID()); err != nil { if !user.IsRoot() {
ctx.AbortWithStatusJSON(http.StatusBadRequest, NewApiErrorResp(err)) ctx.AbortWithStatusJSON(http.StatusUnauthorized, NewApiErrorStringResp("only root can close room"))
return
}
err = Rooms.DelRoom(user.Room().ID())
if err != nil {
ctx.AbortWithStatusJSON(http.StatusInternalServerError, NewApiErrorResp(err))
return return
} }

@ -7,7 +7,6 @@ import (
"github.com/synctv-org/synctv/room" "github.com/synctv-org/synctv/room"
"github.com/zijiren233/gencontainer/rwmap" "github.com/zijiren233/gencontainer/rwmap"
"github.com/zijiren233/ksync"
rtmps "github.com/zijiren233/livelib/server" rtmps "github.com/zijiren233/livelib/server"
) )
@ -33,26 +32,16 @@ var (
) )
type rooms struct { type rooms struct {
rooms *rwmap.RWMap[string, *room.Room] rooms rwmap.RWMap[string, *room.Room]
lock *ksync.Krwmutex
} }
func newRooms() *rooms { func newRooms() *rooms {
return &rooms{ return &rooms{}
rooms: &rwmap.RWMap[string, *room.Room]{},
lock: ksync.NewKrwmutex(),
}
} }
func (rs *rooms) List() (rooms []*room.Room) { func (rs *rooms) List() (rooms []*room.Room) {
rooms = make([]*room.Room, 0, rs.rooms.Len()) rooms = make([]*room.Room, 0, rs.rooms.Len())
rs.rooms.Range(func(id string, r *room.Room) bool { rs.rooms.Range(func(id string, r *room.Room) bool {
rs.lock.RLock(id)
defer rs.lock.RUnlock(id)
if r.Closed() {
rs.rooms.Delete(id)
return true
}
rooms = append(rooms, r) rooms = append(rooms, r)
return true return true
}) })
@ -62,12 +51,6 @@ func (rs *rooms) List() (rooms []*room.Room) {
func (rs *rooms) ListNonHidden() (rooms []*room.Room) { func (rs *rooms) ListNonHidden() (rooms []*room.Room) {
rooms = make([]*room.Room, 0, rs.rooms.Len()) rooms = make([]*room.Room, 0, rs.rooms.Len())
rs.rooms.Range(func(id string, r *room.Room) bool { rs.rooms.Range(func(id string, r *room.Room) bool {
rs.lock.RLock(id)
defer rs.lock.RUnlock(id)
if r.Closed() {
rs.rooms.Delete(id)
return true
}
if !r.Hidden() { if !r.Hidden() {
rooms = append(rooms, r) rooms = append(rooms, r)
} }
@ -79,12 +62,6 @@ func (rs *rooms) ListNonHidden() (rooms []*room.Room) {
func (rs *rooms) ListHidden() (rooms []*room.Room) { func (rs *rooms) ListHidden() (rooms []*room.Room) {
rooms = make([]*room.Room, 0, rs.rooms.Len()) rooms = make([]*room.Room, 0, rs.rooms.Len())
rs.rooms.Range(func(id string, r *room.Room) bool { rs.rooms.Range(func(id string, r *room.Room) bool {
rs.lock.RLock(id)
defer rs.lock.RUnlock(id)
if r.Closed() {
rs.rooms.Delete(id)
return true
}
if r.Hidden() { if r.Hidden() {
rooms = append(rooms, r) rooms = append(rooms, r)
} }
@ -94,12 +71,7 @@ func (rs *rooms) ListHidden() (rooms []*room.Room) {
} }
func (rs *rooms) HasRoom(id string) bool { func (rs *rooms) HasRoom(id string) bool {
rs.lock.Lock(id) _, ok := rs.rooms.Load(id)
defer rs.lock.Unlock(id)
r, ok := rs.rooms.Load(id)
if !ok || r.Closed() {
return false
}
return ok return ok
} }
@ -107,30 +79,22 @@ func (rs *rooms) GetRoom(id string) (*room.Room, error) {
if id == "" { if id == "" {
return nil, ErrRoomIDEmpty return nil, ErrRoomIDEmpty
} }
rs.lock.RLock(id)
defer rs.lock.RUnlock(id)
r, ok := rs.rooms.Load(id) r, ok := rs.rooms.Load(id)
if !ok || r.Closed() { if !ok {
return nil, ErrRoomNotFound return nil, ErrRoomNotFound
} }
return r, nil return r, nil
} }
func (rs *rooms) CreateRoom(id string, password string, s *rtmps.Server, conf ...room.RoomConf) (*room.Room, error) { func (rs *rooms) CreateRoom(id string, password string, s *rtmps.Server, conf ...room.RoomConf) (*room.Room, error) {
if id == "" {
return nil, ErrRoomIDEmpty
}
rs.lock.Lock(id)
defer rs.lock.Unlock(id)
if oldR, ok := rs.rooms.Load(id); ok && !oldR.Closed() {
return nil, ErrRoomAlreadyExist
}
r, err := room.NewRoom(id, password, s, conf...) r, err := room.NewRoom(id, password, s, conf...)
if err != nil { if err != nil {
return nil, err return nil, err
} }
rs.rooms.Store(id, r) r, loaded := rs.rooms.LoadOrStore(r.ID(), r)
if loaded {
return nil, ErrRoomAlreadyExist
}
return r, nil return r, nil
} }
@ -138,8 +102,6 @@ func (rs *rooms) DelRoom(id string) error {
if id == "" { if id == "" {
return ErrRoomIDEmpty return ErrRoomIDEmpty
} }
rs.lock.Lock(id)
defer rs.lock.Unlock(id)
r, ok := rs.rooms.LoadAndDelete(id) r, ok := rs.rooms.LoadAndDelete(id)
if !ok { if !ok {
return ErrRoomNotFound return ErrRoomNotFound

@ -7,8 +7,9 @@ import (
) )
func Init(e *gin.Engine) { func Init(e *gin.Engine) {
w := log.StandardLogger().Writer()
e. e.
Use(gin.LoggerWithWriter(log.StandardLogger().Out), gin.RecoveryWithWriter(log.StandardLogger().Out)). Use(gin.LoggerWithWriter(w), gin.RecoveryWithWriter(w)).
Use(NewCors()) Use(NewCors())
if conf.Conf.Server.Quic { if conf.Conf.Server.Quic {
e.Use(NewQuic()) e.Use(NewQuic())

Loading…
Cancel
Save