From 663c3ed7f86cdbcb7aa1c44ae2b9e74578d0b7b9 Mon Sep 17 00:00:00 2001 From: iacore Date: Mon, 27 Oct 2025 08:19:44 +0000 Subject: [PATCH] refactor api/feed.go --- api/feed.go | 116 +++++++++----------------- api/stream.go | 223 +++++++++++++++++++------------------------------- api/types.go | 2 +- feed/feed.go | 2 +- todo | 14 ++++ ui/tutview.go | 2 +- 6 files changed, 139 insertions(+), 220 deletions(-) diff --git a/api/feed.go b/api/feed.go index 79f8eca..c245cf9 100644 --- a/api/feed.go +++ b/api/feed.go @@ -10,9 +10,8 @@ import ( type TimelineType uint -func (ac *AccountClient) getStatusSimilar(fn func() ([]*mastodon.Status, error), timeline string) ([]Item, error) { +func (ac *AccountClient) getStatusSimilar(statuses []*mastodon.Status, err error) ([]Item, error) { var items []Item - statuses, err := fn() if err != nil { return items, err } @@ -23,9 +22,8 @@ func (ac *AccountClient) getStatusSimilar(fn func() ([]*mastodon.Status, error), return items, nil } -func (ac *AccountClient) getUserSimilar(fn func() ([]*mastodon.Account, error), data interface{}) ([]Item, error) { +func (ac *AccountClient) getUserSimilar(users []*mastodon.Account, err error, data interface{}) ([]Item, error) { var items []Item - users, err := fn() if err != nil { return items, err } @@ -53,24 +51,15 @@ func (ac *AccountClient) getUserSimilar(fn func() ([]*mastodon.Account, error), } func (ac *AccountClient) GetTimeline(pg *mastodon.Pagination) ([]Item, error) { - fn := func() ([]*mastodon.Status, error) { - return ac.Client.GetTimelineHome(context.Background(), pg) - } - return ac.getStatusSimilar(fn, "home") + return ac.getStatusSimilar(ac.Client.GetTimelineHome(context.Background(), pg)) } func (ac *AccountClient) GetTimelineFederated(pg *mastodon.Pagination) ([]Item, error) { - fn := func() ([]*mastodon.Status, error) { - return ac.Client.GetTimelinePublic(context.Background(), false, pg) - } - return ac.getStatusSimilar(fn, "public") + return ac.getStatusSimilar(ac.Client.GetTimelinePublic(context.Background(), false, pg)) } func (ac *AccountClient) GetTimelineLocal(pg *mastodon.Pagination) ([]Item, error) { - fn := func() ([]*mastodon.Status, error) { - return ac.Client.GetTimelinePublic(context.Background(), true, pg) - } - return ac.getStatusSimilar(fn, "public") + return ac.getStatusSimilar(ac.Client.GetTimelinePublic(context.Background(), true, pg)) } func (ac *AccountClient) GetNotifications(nth []config.NotificationToHide, pg *mastodon.Pagination) ([]Item, error) { @@ -134,17 +123,11 @@ func (ac *AccountClient) GetThread(status *mastodon.Status) ([]Item, error) { } func (ac *AccountClient) GetFavorites(pg *mastodon.Pagination) ([]Item, error) { - fn := func() ([]*mastodon.Status, error) { - return ac.Client.GetFavourites(context.Background(), pg) - } - return ac.getStatusSimilar(fn, "home") + return ac.getStatusSimilar(ac.Client.GetFavourites(context.Background(), pg)) } func (ac *AccountClient) GetBookmarks(pg *mastodon.Pagination) ([]Item, error) { - fn := func() ([]*mastodon.Status, error) { - return ac.Client.GetBookmarks(context.Background(), pg) - } - return ac.getStatusSimilar(fn, "home") + return ac.getStatusSimilar(ac.Client.GetBookmarks(context.Background(), pg)) } func (ac *AccountClient) GetConversations(pg *mastodon.Pagination) ([]Item, error) { @@ -199,52 +182,38 @@ func (ac *AccountClient) GetUsers(search string) ([]Item, error) { } func (ac *AccountClient) GetBoostsStatus(pg *mastodon.Pagination, id mastodon.ID) ([]Item, error) { - fn := func() ([]*mastodon.Account, error) { - return ac.Client.GetRebloggedBy(context.Background(), id, pg) - } - return ac.getUserSimilar(fn, nil) + users, err := ac.Client.GetRebloggedBy(context.Background(), id, pg) + return ac.getUserSimilar(users, err, nil) } func (ac *AccountClient) GetFavoritesStatus(pg *mastodon.Pagination, id mastodon.ID) ([]Item, error) { - fn := func() ([]*mastodon.Account, error) { - return ac.Client.GetFavouritedBy(context.Background(), id, pg) - } - return ac.getUserSimilar(fn, nil) + users, err := ac.Client.GetFavouritedBy(context.Background(), id, pg) + return ac.getUserSimilar(users, err, nil) } func (ac *AccountClient) GetFollowers(pg *mastodon.Pagination, id mastodon.ID) ([]Item, error) { - fn := func() ([]*mastodon.Account, error) { - return ac.Client.GetAccountFollowers(context.Background(), id, pg) - } - return ac.getUserSimilar(fn, nil) + users, err := ac.Client.GetAccountFollowers(context.Background(), id, pg) + return ac.getUserSimilar(users, err, nil) } func (ac *AccountClient) GetFollowing(pg *mastodon.Pagination, id mastodon.ID) ([]Item, error) { - fn := func() ([]*mastodon.Account, error) { - return ac.Client.GetAccountFollowing(context.Background(), id, pg) - } - return ac.getUserSimilar(fn, nil) + users, err := ac.Client.GetAccountFollowing(context.Background(), id, pg) + return ac.getUserSimilar(users, err, nil) } func (ac *AccountClient) GetBlocking(pg *mastodon.Pagination) ([]Item, error) { - fn := func() ([]*mastodon.Account, error) { - return ac.Client.GetBlocks(context.Background(), pg) - } - return ac.getUserSimilar(fn, nil) + users, err := ac.Client.GetBlocks(context.Background(), pg) + return ac.getUserSimilar(users, err, nil) } func (ac *AccountClient) GetMuting(pg *mastodon.Pagination) ([]Item, error) { - fn := func() ([]*mastodon.Account, error) { - return ac.Client.GetMutes(context.Background(), pg) - } - return ac.getUserSimilar(fn, nil) + users, err := ac.Client.GetMutes(context.Background(), pg) + return ac.getUserSimilar(users, err, nil) } func (ac *AccountClient) GetFollowRequests(pg *mastodon.Pagination) ([]Item, error) { - fn := func() ([]*mastodon.Account, error) { - return ac.Client.GetFollowRequests(context.Background(), pg) - } - return ac.getUserSimilar(fn, nil) + users, err := ac.Client.GetFollowRequests(context.Background(), pg) + return ac.getUserSimilar(users, err, nil) } func (ac *AccountClient) GetUser(pg *mastodon.Pagination, id mastodon.ID) ([]Item, error) { @@ -311,41 +280,32 @@ func (ac *AccountClient) GetListStatuses(pg *mastodon.Pagination, id mastodon.ID } func (ac *AccountClient) GetFollowingForList(pg *mastodon.Pagination, id mastodon.ID, data interface{}) ([]Item, error) { - fn := func() ([]*mastodon.Account, error) { - return ac.Client.GetAccountFollowing(context.Background(), id, pg) - } - return ac.getUserSimilar(fn, data) + users, err := ac.Client.GetAccountFollowing(context.Background(), id, pg) + return ac.getUserSimilar(users, err, data) } func (ac *AccountClient) GetListUsers(pg *mastodon.Pagination, id mastodon.ID, data interface{}) ([]Item, error) { - fn := func() ([]*mastodon.Account, error) { - return ac.Client.GetListAccounts(context.Background(), id) - } - return ac.getUserSimilar(fn, data) + users, err := ac.Client.GetListAccounts(context.Background(), id) + return ac.getUserSimilar(users, err, data) } func (ac *AccountClient) GetTag(pg *mastodon.Pagination, search string) ([]Item, error) { - fn := func() ([]*mastodon.Status, error) { - return ac.Client.GetTimelineHashtag(context.Background(), search, false, pg) - } - return ac.getStatusSimilar(fn, "public") + return ac.getStatusSimilar(ac.Client.GetTimelineHashtag(context.Background(), search, false, pg)) } func (ac *AccountClient) GetTagMultiple(pg *mastodon.Pagination, search string) ([]Item, error) { - fn := func() ([]*mastodon.Status, error) { - var s string - td := mastodon.TagData{} - parts := strings.Split(search, " ") - for i, p := range parts { - if i == 0 { - s = p - continue - } - if len(p) > 0 { - td.Any = append(td.Any, p) - } + var s string + td := mastodon.TagData{} + parts := strings.Split(search, " ") + for i, p := range parts { + if i == 0 { + s = p + continue + } + if len(p) > 0 { + td.Any = append(td.Any, p) } - return ac.Client.GetTimelineHashtagMultiple(context.Background(), s, false, &td, pg) } - return ac.getStatusSimilar(fn, "public") + items, err := ac.Client.GetTimelineHashtagMultiple(context.Background(), s, false, &td, pg) + return ac.getStatusSimilar(items, err) } diff --git a/api/stream.go b/api/stream.go index 01e2ecd..6e48cae 100644 --- a/api/stream.go +++ b/api/stream.go @@ -30,38 +30,49 @@ const ( ListStream ) +type StreamID string + +func MakeStreamID(st StreamType, data string) StreamID { + switch st { + case HomeStream: + return "HomeStream" + case LocalStream: + return "LocalStream" + case FederatedStream: + return "FederatedStream" + case DirectStream: + return "DirectStream" + case TagStream: + return StreamID("TagStream" + data) + case ListStream: + return StreamID("ListStream" + data) + default: + panic("invalid StreamType") + } +} + +type Receiver func(mastodon.Event) // always use *Receiver, because == comparison need it + type Stream struct { - id string + id StreamID receivers []*Receiver incoming chan mastodon.Event - closed bool + cancel context.CancelFunc mux sync.Mutex } -type Receiver struct { - Ch chan mastodon.Event - Closed bool - mux sync.Mutex -} - -func (s *Stream) ID() string { +func (s *Stream) ID() StreamID { return s.id } -func (s *Stream) AddReceiver() *Receiver { - ch := make(chan mastodon.Event) - rec := &Receiver{ - Ch: ch, - Closed: false, - } - s.receivers = append(s.receivers, rec) - return rec +func (s *Stream) AddReceiver(r *Receiver) { + s.receivers = append(s.receivers, r) } func (s *Stream) RemoveReceiver(r *Receiver) { index := -1 for i, rec := range s.receivers { - if rec.Ch == r.Ch { + if rec == r { index = i break } @@ -69,173 +80,107 @@ func (s *Stream) RemoveReceiver(r *Receiver) { if index == -1 { return } - s.receivers[index].mux.Lock() - if !s.receivers[index].Closed { - close(s.receivers[index].Ch) - s.receivers[index].Closed = true - } - s.receivers[index].mux.Unlock() s.receivers = append(s.receivers[:index], s.receivers[index+1:]...) } -func (s *Stream) listen() { - for e := range s.incoming { - switch e.(type) { - case *mastodon.UpdateEvent, *mastodon.ConversationEvent, *mastodon.NotificationEvent, *mastodon.DeleteEvent, *mastodon.ErrorEvent: - for _, r := range s.receivers { - go func(rec *Receiver, e mastodon.Event) { - rec.mux.Lock() - if rec.Closed { - return - } - rec.mux.Unlock() - rec.Ch <- e - }(r, e) +func (s *Stream) listen(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case e := <-s.incoming: + switch e.(type) { + case *mastodon.UpdateEvent, *mastodon.ConversationEvent, *mastodon.NotificationEvent, *mastodon.DeleteEvent, *mastodon.ErrorEvent: + for _, r := range s.receivers { + (*r)(e) + } } } } } -func newStream(id string, inc chan mastodon.Event) (*Stream, *Receiver) { +func newStream(ctx context.Context, id StreamID, input chan mastodon.Event) *Stream { + ctx, cancel := context.WithCancel(ctx) stream := &Stream{ id: id, - incoming: inc, + incoming: input, + cancel: cancel, } - rec := stream.AddReceiver() - go stream.listen() - return stream, rec + go stream.listen(ctx) + return stream } -func (ac *AccountClient) NewGenericStream(st StreamType, data string) (rec *Receiver, err error) { - var id string - switch st { - case HomeStream: - id = "HomeStream" - case LocalStream: - id = "LocalStream" - case FederatedStream: - id = "FederatedStream" - case DirectStream: - id = "DirectStream" - case TagStream: - id = "TagStream" + data - case ListStream: - id = "ListStream" + data - default: - panic("invalid StreamType") - } +func (ac *AccountClient) CreateOrGetStream(ctx context.Context, st StreamType, data string) (stream *Stream, err error) { + id := MakeStreamID(st, data) + + // get stream for _, s := range ac.Streams { if s.ID() == id { - rec = s.AddReceiver() - return rec, nil + return s, nil } } + + // create steram var ch chan mastodon.Event switch st { case HomeStream: - ch, err = ac.WSClient.StreamingWSUser(context.Background()) + ch, err = ac.WSClient.StreamingWSUser(ctx) case LocalStream: - ch, err = ac.WSClient.StreamingWSPublic(context.Background(), true) + ch, err = ac.WSClient.StreamingWSPublic(ctx, true) case FederatedStream: - ch, err = ac.WSClient.StreamingWSPublic(context.Background(), false) + ch, err = ac.WSClient.StreamingWSPublic(ctx, false) case DirectStream: - ch, err = ac.WSClient.StreamingWSDirect(context.Background()) + ch, err = ac.WSClient.StreamingWSDirect(ctx) case TagStream: - ch, err = ac.WSClient.StreamingWSHashtag(context.Background(), data, false) + ch, err = ac.WSClient.StreamingWSHashtag(ctx, data, false) case ListStream: - ch, err = ac.WSClient.StreamingWSList(context.Background(), mastodon.ID(data)) + ch, err = ac.WSClient.StreamingWSList(ctx, mastodon.ID(data)) default: panic("invalid StreamType") } if err != nil { return nil, err } - stream, rec := newStream(id, ch) + stream = newStream(ctx, id, ch) ac.Streams[stream.ID()] = stream - return rec, nil + return stream, nil } -func (ac *AccountClient) NewHomeStream() (*Receiver, string, error) { - rec, err := ac.NewGenericStream(HomeStream, "") - return rec, "home", err -} - -func (ac *AccountClient) NewLocalStream() (*Receiver, string, error) { - rec, err := ac.NewGenericStream(LocalStream, "") - return rec, "public", err -} - -func (ac *AccountClient) NewFederatedStream() (*Receiver, string, error) { - rec, err := ac.NewGenericStream(FederatedStream, "") - return rec, "public", err -} - -func (ac *AccountClient) NewDirectStream() (*Receiver, string, error) { - rec, err := ac.NewGenericStream(DirectStream, "") - return rec, "public", err -} - -func (ac *AccountClient) NewListStream(id mastodon.ID) (*Receiver, string, error) { - rec, err := ac.NewGenericStream(ListStream, string(id)) - return rec, "home", err -} - -func (ac *AccountClient) NewTagStream(tag string) (*Receiver, string, error) { - rec, err := ac.NewGenericStream(TagStream, tag) - return rec, "public", err -} - -func (ac *AccountClient) RemoveGenericReceiver(rec *Receiver, st StreamType, data string) { - var id string - switch st { - case HomeStream: - id = "HomeStream" - case LocalStream: - id = "LocalStream" - case FederatedStream: - id = "FederatedStream" - case DirectStream: - id = "DirectStream" - case TagStream: - id = "TagStream" + data - case ListStream: - id = "ListStream" + data - default: - panic("invalid StreamType") - } +func (ac *AccountClient) RemoveReceiver(rec *Receiver, st StreamType, data string) { + id := MakeStreamID(st, data) stream, ok := ac.Streams[id] if !ok { return } - stream.RemoveReceiver(rec) stream.mux.Lock() - if len(stream.receivers) == 0 && !stream.closed { - stream.closed = true + stream.RemoveReceiver(rec) + if len(stream.receivers) == 0 { + stream.cancel() delete(ac.Streams, id) } stream.mux.Unlock() } -func (ac *AccountClient) RemoveHomeReceiver(rec *Receiver) { - ac.RemoveGenericReceiver(rec, HomeStream, "") -} +// func (ac *AccountClient) RemoveHomeReceiver(rec *Receiver) { +// ac.RemoveGenericReceiver(rec, HomeStream, "") +// } -func (ac *AccountClient) RemoveLocalReceiver(rec *Receiver) { - ac.RemoveGenericReceiver(rec, LocalStream, "") -} +// func (ac *AccountClient) RemoveLocalReceiver(rec *Receiver) { +// ac.RemoveGenericReceiver(rec, LocalStream, "") +// } -func (ac *AccountClient) RemoveConversationReceiver(rec *Receiver) { - ac.RemoveGenericReceiver(rec, DirectStream, "") -} +// func (ac *AccountClient) RemoveConversationReceiver(rec *Receiver) { +// ac.RemoveGenericReceiver(rec, DirectStream, "") +// } -func (ac *AccountClient) RemoveFederatedReceiver(rec *Receiver) { - ac.RemoveGenericReceiver(rec, FederatedStream, "") -} +// func (ac *AccountClient) RemoveFederatedReceiver(rec *Receiver) { +// ac.RemoveGenericReceiver(rec, FederatedStream, "") +// } -func (ac *AccountClient) RemoveListReceiver(rec *Receiver, id mastodon.ID) { - ac.RemoveGenericReceiver(rec, ListStream, string(id)) -} +// func (ac *AccountClient) RemoveListReceiver(rec *Receiver, id mastodon.ID) { +// ac.RemoveGenericReceiver(rec, ListStream, string(id)) +// } -func (ac *AccountClient) RemoveTagReceiver(rec *Receiver, tag string) { - ac.RemoveGenericReceiver(rec, TagStream, tag) -} +// func (ac *AccountClient) RemoveTagReceiver(rec *Receiver, tag string) { +// ac.RemoveGenericReceiver(rec, TagStream, tag) +// } diff --git a/api/types.go b/api/types.go index a969ff8..470d428 100644 --- a/api/types.go +++ b/api/types.go @@ -9,7 +9,7 @@ type RequestData struct { type AccountClient struct { Client *mastodon.Client - Streams map[string]*Stream + Streams map[StreamID]*Stream Me *mastodon.Account WSClient *mastodon.WSClient InstanceOld *mastodon.Instance diff --git a/feed/feed.go b/feed/feed.go index 6565b91..b8758e0 100644 --- a/feed/feed.go +++ b/feed/feed.go @@ -748,7 +748,7 @@ func (f *Feed) startStream(rec *api.Receiver, timeline string, err error) { }() } -func (f *Feed) startStreamNotification(rec *api.Receiver, timeline string, err error, mentions bool) { +func (f *Feed) startStreamNotification(rec *api.Receiver, err error, mentions bool) { if err != nil { log.Fatalln("Couldn't open stream") } diff --git a/todo b/todo index a193f54..39e08fc 100644 --- a/todo +++ b/todo @@ -5,3 +5,17 @@ Race conditions. Opening multiple threads in one panel might race condition and kitty image protocol switch keymap to kakoune + + +## stream refactor + +stream cached inside ac.Streams +each stream can have multiple receivers +receivers close themselves when dropped + +## feed refactor (feed/feed.go, ui/feed.go) + +desktop feed. based on streams. + +todo: turn the API from pull based to push based + diff --git a/ui/tutview.go b/ui/tutview.go index 77d1cb2..afcf426 100644 --- a/ui/tutview.go +++ b/ui/tutview.go @@ -231,7 +231,7 @@ func (tv *TutView) loggedIn(acc auth.Account) { ac := &api.AccountClient{ Me: me, Client: client, - Streams: make(map[string]*api.Stream), + Streams: make(map[api.StreamID]*api.Stream), WSClient: client.NewWSClient(), } inst, err := ac.Client.GetInstanceV2(context.Background())