package api import ( "context" "sync" "github.com/RasmusLindroth/go-mastodon" ) type MastodonType uint const ( StatusType MastodonType = iota StatusHistoryType UserType ProfileType NotificationType ListsType TagType ) type StreamType uint const ( HomeStream StreamType = iota LocalStream FederatedStream DirectStream TagStream ListStream ) type Stream struct { id string receivers []*Receiver incoming chan mastodon.Event closed bool mux sync.Mutex } type Receiver struct { Ch chan mastodon.Event Closed bool mux sync.Mutex } func (s *Stream) ID() string { return s.id } func (s *Stream) AddReceiver() *Receiver { ch := make(chan mastodon.Event) rec := &Receiver{ Ch: ch, Closed: false, } s.receivers = append(s.receivers, rec) return rec } func (s *Stream) RemoveReceiver(r *Receiver) { index := -1 for i, rec := range s.receivers { if rec.Ch == r.Ch { index = i break } } if index == -1 { return } s.receivers[index].mux.Lock() if !s.receivers[index].Closed { close(s.receivers[index].Ch) s.receivers[index].Closed = true } s.receivers[index].mux.Unlock() s.receivers = append(s.receivers[:index], s.receivers[index+1:]...) } func (s *Stream) listen() { for e := range s.incoming { switch e.(type) { case *mastodon.UpdateEvent, *mastodon.ConversationEvent, *mastodon.NotificationEvent, *mastodon.DeleteEvent, *mastodon.ErrorEvent: for _, r := range s.receivers { go func(rec *Receiver, e mastodon.Event) { rec.mux.Lock() if rec.Closed { return } rec.mux.Unlock() rec.Ch <- e }(r, e) } } } } func newStream(id string, inc chan mastodon.Event) (*Stream, *Receiver) { stream := &Stream{ id: id, incoming: inc, } rec := stream.AddReceiver() go stream.listen() return stream, rec } func (ac *AccountClient) NewGenericStream(st StreamType, data string) (rec *Receiver, err error) { var id string switch st { case HomeStream: id = "HomeStream" case LocalStream: id = "LocalStream" case FederatedStream: id = "FederatedStream" case DirectStream: id = "DirectStream" case TagStream: id = "TagStream" + data case ListStream: id = "ListStream" + data default: panic("invalid StreamType") } for _, s := range ac.Streams { if s.ID() == id { rec = s.AddReceiver() return rec, nil } } var ch chan mastodon.Event switch st { case HomeStream: ch, err = ac.WSClient.StreamingWSUser(context.Background()) case LocalStream: ch, err = ac.WSClient.StreamingWSPublic(context.Background(), true) case FederatedStream: ch, err = ac.WSClient.StreamingWSPublic(context.Background(), false) case DirectStream: ch, err = ac.WSClient.StreamingWSDirect(context.Background()) case TagStream: ch, err = ac.WSClient.StreamingWSHashtag(context.Background(), data, false) case ListStream: ch, err = ac.WSClient.StreamingWSList(context.Background(), mastodon.ID(data)) default: panic("invalid StreamType") } if err != nil { return nil, err } stream, rec := newStream(id, ch) ac.Streams[stream.ID()] = stream return rec, nil } func (ac *AccountClient) NewHomeStream() (*Receiver, string, error) { rec, err := ac.NewGenericStream(HomeStream, "") return rec, "home", err } func (ac *AccountClient) NewLocalStream() (*Receiver, string, error) { rec, err := ac.NewGenericStream(LocalStream, "") return rec, "public", err } func (ac *AccountClient) NewFederatedStream() (*Receiver, string, error) { rec, err := ac.NewGenericStream(FederatedStream, "") return rec, "public", err } func (ac *AccountClient) NewDirectStream() (*Receiver, string, error) { rec, err := ac.NewGenericStream(DirectStream, "") return rec, "public", err } func (ac *AccountClient) NewListStream(id mastodon.ID) (*Receiver, string, error) { rec, err := ac.NewGenericStream(ListStream, string(id)) return rec, "home", err } func (ac *AccountClient) NewTagStream(tag string) (*Receiver, string, error) { rec, err := ac.NewGenericStream(TagStream, tag) return rec, "public", err } func (ac *AccountClient) RemoveGenericReceiver(rec *Receiver, st StreamType, data string) { var id string switch st { case HomeStream: id = "HomeStream" case LocalStream: id = "LocalStream" case FederatedStream: id = "FederatedStream" case DirectStream: id = "DirectStream" case TagStream: id = "TagStream" + data case ListStream: id = "ListStream" + data default: panic("invalid StreamType") } stream, ok := ac.Streams[id] if !ok { return } stream.RemoveReceiver(rec) stream.mux.Lock() if len(stream.receivers) == 0 && !stream.closed { stream.closed = true delete(ac.Streams, id) } stream.mux.Unlock() } func (ac *AccountClient) RemoveHomeReceiver(rec *Receiver) { ac.RemoveGenericReceiver(rec, HomeStream, "") } func (ac *AccountClient) RemoveLocalReceiver(rec *Receiver) { ac.RemoveGenericReceiver(rec, LocalStream, "") } func (ac *AccountClient) RemoveConversationReceiver(rec *Receiver) { ac.RemoveGenericReceiver(rec, DirectStream, "") } func (ac *AccountClient) RemoveFederatedReceiver(rec *Receiver) { ac.RemoveGenericReceiver(rec, FederatedStream, "") } func (ac *AccountClient) RemoveListReceiver(rec *Receiver, id mastodon.ID) { ac.RemoveGenericReceiver(rec, ListStream, string(id)) } func (ac *AccountClient) RemoveTagReceiver(rec *Receiver, tag string) { ac.RemoveGenericReceiver(rec, TagStream, tag) }