From 6c4b8185d7f9b4f66cc673fd13c448dff53472c0 Mon Sep 17 00:00:00 2001 From: Kegsay Date: Fri, 3 Jul 2020 12:59:00 +0100 Subject: [PATCH] Implement ExtraPublicRoomsProvider for p2p demos (#1180) * Change API and rename to ExtraPublicRoomsProvider * Make dendritejs work again * Maybe make libp2p demo work again * Linting --- build/docker/DendriteJS.Dockerfile | 2 +- clientapi/api/api.go | 11 +- clientapi/clientapi.go | 2 +- clientapi/routing/directory_public.go | 294 ++++++--------------- clientapi/routing/directory_public_test.go | 14 +- clientapi/routing/routing.go | 8 +- cmd/dendrite-demo-libp2p/main.go | 30 +-- cmd/dendrite-demo-libp2p/publicrooms.go | 156 +++++++++++ cmd/dendritejs/main.go | 2 +- cmd/dendritejs/publicrooms.go | 87 +++++- currentstateserver/api/wrapper.go | 75 ++++++ internal/setup/monolith.go | 2 +- 12 files changed, 433 insertions(+), 250 deletions(-) create mode 100644 cmd/dendrite-demo-libp2p/publicrooms.go create mode 100644 currentstateserver/api/wrapper.go diff --git a/build/docker/DendriteJS.Dockerfile b/build/docker/DendriteJS.Dockerfile index 4467c9c7..90391c45 100644 --- a/build/docker/DendriteJS.Dockerfile +++ b/build/docker/DendriteJS.Dockerfile @@ -5,7 +5,7 @@ # $ docker build -t dendritejs -f DendriteJS.Dockerfile . # $ docker run --rm -p 8888:80 dendritejs # Then visit http://localhost:8888 -FROM golang:1.13.7-alpine3.11 AS gobuild +FROM golang:1.14-alpine AS gobuild # Download and build dendrite WORKDIR /build diff --git a/clientapi/api/api.go b/clientapi/api/api.go index dae462c0..d96b032f 100644 --- a/clientapi/api/api.go +++ b/clientapi/api/api.go @@ -14,9 +14,10 @@ package api -type ExternalPublicRoomsProvider interface { - // The list of homeserver domains to query. These servers will receive a request - // via this API: https://matrix.org/docs/spec/server_server/latest#public-room-directory - // This will be called -on demand- by clients, so cache appropriately! - Homeservers() []string +import "github.com/matrix-org/gomatrixserverlib" + +// ExtraPublicRoomsProvider provides a way to inject extra published rooms into /publicRooms requests. +type ExtraPublicRoomsProvider interface { + // Rooms returns the extra rooms. This is called on-demand by clients, so cache appropriately. + Rooms() []gomatrixserverlib.PublicRoom } diff --git a/clientapi/clientapi.go b/clientapi/clientapi.go index bbce6dcc..029a73da 100644 --- a/clientapi/clientapi.go +++ b/clientapi/clientapi.go @@ -48,7 +48,7 @@ func AddPublicRoutes( transactionsCache *transactions.Cache, fsAPI federationSenderAPI.FederationSenderInternalAPI, userAPI userapi.UserInternalAPI, - extRoomsProvider api.ExternalPublicRoomsProvider, + extRoomsProvider api.ExtraPublicRoomsProvider, ) { syncProducer := &producers.SyncAPIProducer{ Producer: producer, diff --git a/clientapi/routing/directory_public.go b/clientapi/routing/directory_public.go index 64600cb4..925c1b8a 100644 --- a/clientapi/routing/directory_public.go +++ b/clientapi/routing/directory_public.go @@ -22,7 +22,6 @@ import ( "strconv" "strings" "sync" - "time" "github.com/matrix-org/dendrite/clientapi/api" "github.com/matrix-org/dendrite/clientapi/httputil" @@ -33,6 +32,11 @@ import ( "github.com/matrix-org/util" ) +var ( + cacheMu sync.Mutex + publicRoomsCache []gomatrixserverlib.PublicRoom +) + type PublicRoomReq struct { Since string `json:"since,omitempty"` Limit int16 `json:"limit,omitempty"` @@ -46,13 +50,15 @@ type filter struct { // GetPostPublicRooms implements GET and POST /publicRooms func GetPostPublicRooms( req *http.Request, rsAPI roomserverAPI.RoomserverInternalAPI, stateAPI currentstateAPI.CurrentStateInternalAPI, + extRoomsProvider api.ExtraPublicRoomsProvider, ) util.JSONResponse { var request PublicRoomReq if fillErr := fillPublicRoomsReq(req, &request); fillErr != nil { return *fillErr } - response, err := publicRooms(req.Context(), request, rsAPI, stateAPI) + response, err := publicRooms(req.Context(), request, rsAPI, stateAPI, extRoomsProvider) if err != nil { + util.GetLogger(req.Context()).WithError(err).Errorf("failed to work out public rooms") return jsonerror.InternalServerError() } return util.JSONResponse{ @@ -61,146 +67,9 @@ func GetPostPublicRooms( } } -// GetPostPublicRoomsWithExternal is the same as GetPostPublicRooms but also mixes in public rooms from the provider supplied. -func GetPostPublicRoomsWithExternal( - req *http.Request, rsAPI roomserverAPI.RoomserverInternalAPI, stateAPI currentstateAPI.CurrentStateInternalAPI, - fedClient *gomatrixserverlib.FederationClient, extRoomsProvider api.ExternalPublicRoomsProvider, -) util.JSONResponse { - var request PublicRoomReq - if fillErr := fillPublicRoomsReq(req, &request); fillErr != nil { - return *fillErr - } - response, err := publicRooms(req.Context(), request, rsAPI, stateAPI) - if err != nil { - return jsonerror.InternalServerError() - } - - if request.Since != "" { - // TODO: handle pagination tokens sensibly rather than ignoring them. - // ignore paginated requests since we don't handle them yet over federation. - // Only the initial request will contain federated rooms. - return util.JSONResponse{ - Code: http.StatusOK, - JSON: response, - } - } - - // If we have already hit the limit on the number of rooms, bail. - var limit int - if request.Limit > 0 { - limit = int(request.Limit) - len(response.Chunk) - if limit <= 0 { - return util.JSONResponse{ - Code: http.StatusOK, - JSON: response, - } - } - } - - // downcasting `limit` is safe as we know it isn't bigger than request.Limit which is int16 - fedRooms := bulkFetchPublicRoomsFromServers(req.Context(), fedClient, extRoomsProvider.Homeservers(), int16(limit)) - response.Chunk = append(response.Chunk, fedRooms...) - - // de-duplicate rooms with the same room ID. We can join the room via any of these aliases as we know these servers - // are alive and well, so we arbitrarily pick one (purposefully shuffling them to spread the load a bit) - var publicRooms []gomatrixserverlib.PublicRoom - haveRoomIDs := make(map[string]bool) - rand.Shuffle(len(response.Chunk), func(i, j int) { - response.Chunk[i], response.Chunk[j] = response.Chunk[j], response.Chunk[i] - }) - for _, r := range response.Chunk { - if haveRoomIDs[r.RoomID] { - continue - } - haveRoomIDs[r.RoomID] = true - publicRooms = append(publicRooms, r) - } - // sort by member count - sort.SliceStable(publicRooms, func(i, j int) bool { - return publicRooms[i].JoinedMembersCount > publicRooms[j].JoinedMembersCount - }) - - response.Chunk = publicRooms - - return util.JSONResponse{ - Code: http.StatusOK, - JSON: response, - } -} - -// bulkFetchPublicRoomsFromServers fetches public rooms from the list of homeservers. -// Returns a list of public rooms up to the limit specified. -func bulkFetchPublicRoomsFromServers( - ctx context.Context, fedClient *gomatrixserverlib.FederationClient, homeservers []string, limit int16, -) (publicRooms []gomatrixserverlib.PublicRoom) { - // follow pipeline semantics, see https://blog.golang.org/pipelines for more info. - // goroutines send rooms to this channel - roomCh := make(chan gomatrixserverlib.PublicRoom, int(limit)) - // signalling channel to tell goroutines to stop sending rooms and quit - done := make(chan bool) - // signalling to say when we can close the room channel - var wg sync.WaitGroup - wg.Add(len(homeservers)) - // concurrently query for public rooms - for _, hs := range homeservers { - go func(homeserverDomain string) { - defer wg.Done() - util.GetLogger(ctx).WithField("hs", homeserverDomain).Info("Querying HS for public rooms") - fres, err := fedClient.GetPublicRooms(ctx, gomatrixserverlib.ServerName(homeserverDomain), int(limit), "", false, "") - if err != nil { - util.GetLogger(ctx).WithError(err).WithField("hs", homeserverDomain).Warn( - "bulkFetchPublicRoomsFromServers: failed to query hs", - ) - return - } - for _, room := range fres.Chunk { - // atomically send a room or stop - select { - case roomCh <- room: - case <-done: - util.GetLogger(ctx).WithError(err).WithField("hs", homeserverDomain).Info("Interrupted whilst sending rooms") - return - } - } - }(hs) - } - - // Close the room channel when the goroutines have quit so we don't leak, but don't let it stop the in-flight request. - // This also allows the request to fail fast if all HSes experience errors as it will cause the room channel to be - // closed. - go func() { - wg.Wait() - util.GetLogger(ctx).Info("Cleaning up resources") - close(roomCh) - }() - - // fan-in results with timeout. We stop when we reach the limit. -FanIn: - for len(publicRooms) < int(limit) || limit == 0 { - // add a room or timeout - select { - case room, ok := <-roomCh: - if !ok { - util.GetLogger(ctx).Info("All homeservers have been queried, returning results.") - break FanIn - } - publicRooms = append(publicRooms, room) - case <-time.After(15 * time.Second): // we've waited long enough, let's tell the client what we got. - util.GetLogger(ctx).Info("Waited 15s for federated public rooms, returning early") - break FanIn - case <-ctx.Done(): // the client hung up on us, let's stop. - util.GetLogger(ctx).Info("Client hung up, returning early") - break FanIn - } - } - // tell goroutines to stop - close(done) - - return publicRooms -} - func publicRooms(ctx context.Context, request PublicRoomReq, rsAPI roomserverAPI.RoomserverInternalAPI, - stateAPI currentstateAPI.CurrentStateInternalAPI) (*gomatrixserverlib.RespPublicRooms, error) { + stateAPI currentstateAPI.CurrentStateInternalAPI, extRoomsProvider api.ExtraPublicRoomsProvider, +) (*gomatrixserverlib.RespPublicRooms, error) { var response gomatrixserverlib.RespPublicRooms var limit int16 @@ -216,23 +85,25 @@ func publicRooms(ctx context.Context, request PublicRoomReq, rsAPI roomserverAPI util.GetLogger(ctx).WithError(err).Error("strconv.ParseInt failed") return nil, err } + err = nil - var queryRes roomserverAPI.QueryPublishedRoomsResponse - err = rsAPI.QueryPublishedRooms(ctx, &roomserverAPI.QueryPublishedRoomsRequest{}, &queryRes) - if err != nil { - util.GetLogger(ctx).WithError(err).Error("QueryPublishedRooms failed") - return nil, err + var rooms []gomatrixserverlib.PublicRoom + if request.Since == "" { + rooms = refreshPublicRoomCache(ctx, rsAPI, extRoomsProvider, stateAPI) + } else { + rooms = getPublicRoomsFromCache() } - response.TotalRoomCountEstimate = len(queryRes.RoomIDs) - roomIDs, prev, next := sliceInto(queryRes.RoomIDs, offset, limit) + response.TotalRoomCountEstimate = len(rooms) + + chunk, prev, next := sliceInto(rooms, offset, limit) if prev >= 0 { response.PrevBatch = "T" + strconv.Itoa(prev) } if next >= 0 { response.NextBatch = "T" + strconv.Itoa(next) } - response.Chunk, err = fillInRooms(ctx, roomIDs, stateAPI) + response.Chunk = chunk return &response, err } @@ -273,71 +144,6 @@ func fillPublicRoomsReq(httpReq *http.Request, request *PublicRoomReq) *util.JSO return nil } -// due to lots of switches -// nolint:gocyclo -func fillInRooms(ctx context.Context, roomIDs []string, stateAPI currentstateAPI.CurrentStateInternalAPI) ([]gomatrixserverlib.PublicRoom, error) { - avatarTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.avatar", StateKey: ""} - nameTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.name", StateKey: ""} - canonicalTuple := gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomCanonicalAlias, StateKey: ""} - topicTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.topic", StateKey: ""} - guestTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.guest_access", StateKey: ""} - visibilityTuple := gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomHistoryVisibility, StateKey: ""} - joinRuleTuple := gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomJoinRules, StateKey: ""} - - var stateRes currentstateAPI.QueryBulkStateContentResponse - err := stateAPI.QueryBulkStateContent(ctx, ¤tstateAPI.QueryBulkStateContentRequest{ - RoomIDs: roomIDs, - AllowWildcards: true, - StateTuples: []gomatrixserverlib.StateKeyTuple{ - nameTuple, canonicalTuple, topicTuple, guestTuple, visibilityTuple, joinRuleTuple, avatarTuple, - {EventType: gomatrixserverlib.MRoomMember, StateKey: "*"}, - }, - }, &stateRes) - if err != nil { - util.GetLogger(ctx).WithError(err).Error("QueryBulkStateContent failed") - return nil, err - } - chunk := make([]gomatrixserverlib.PublicRoom, len(roomIDs)) - i := 0 - for roomID, data := range stateRes.Rooms { - pub := gomatrixserverlib.PublicRoom{ - RoomID: roomID, - } - joinCount := 0 - var joinRule, guestAccess string - for tuple, contentVal := range data { - if tuple.EventType == gomatrixserverlib.MRoomMember && contentVal == "join" { - joinCount++ - continue - } - switch tuple { - case avatarTuple: - pub.AvatarURL = contentVal - case nameTuple: - pub.Name = contentVal - case topicTuple: - pub.Topic = contentVal - case canonicalTuple: - pub.CanonicalAlias = contentVal - case visibilityTuple: - pub.WorldReadable = contentVal == "world_readable" - // need both of these to determine whether guests can join - case joinRuleTuple: - joinRule = contentVal - case guestTuple: - guestAccess = contentVal - } - } - if joinRule == gomatrixserverlib.Public && guestAccess == "can_join" { - pub.GuestCanJoin = true - } - pub.JoinedMembersCount = joinCount - chunk[i] = pub - i++ - } - return chunk, nil -} - // sliceInto returns a subslice of `slice` which honours the since/limit values given. // // 0 1 2 3 4 5 6 index @@ -348,7 +154,7 @@ func fillInRooms(ctx context.Context, roomIDs []string, stateAPI currentstateAPI // limit=3&since=6 => G (prev='3', next='') // // A value of '-1' for prev/next indicates no position. -func sliceInto(slice []string, since int64, limit int16) (subset []string, prev, next int) { +func sliceInto(slice []gomatrixserverlib.PublicRoom, since int64, limit int16) (subset []gomatrixserverlib.PublicRoom, prev, next int) { prev = -1 next = -1 @@ -371,3 +177,61 @@ func sliceInto(slice []string, since int64, limit int16) (subset []string, prev, subset = slice[since:nextIndex] return } + +func refreshPublicRoomCache( + ctx context.Context, rsAPI roomserverAPI.RoomserverInternalAPI, extRoomsProvider api.ExtraPublicRoomsProvider, + stateAPI currentstateAPI.CurrentStateInternalAPI, +) []gomatrixserverlib.PublicRoom { + cacheMu.Lock() + defer cacheMu.Unlock() + var extraRooms []gomatrixserverlib.PublicRoom + if extRoomsProvider != nil { + extraRooms = extRoomsProvider.Rooms() + } + + var queryRes roomserverAPI.QueryPublishedRoomsResponse + err := rsAPI.QueryPublishedRooms(ctx, &roomserverAPI.QueryPublishedRoomsRequest{}, &queryRes) + if err != nil { + util.GetLogger(ctx).WithError(err).Error("QueryPublishedRooms failed") + return publicRoomsCache + } + pubRooms, err := currentstateAPI.PopulatePublicRooms(ctx, queryRes.RoomIDs, stateAPI) + if err != nil { + util.GetLogger(ctx).WithError(err).Error("PopulatePublicRooms failed") + return publicRoomsCache + } + publicRoomsCache = []gomatrixserverlib.PublicRoom{} + publicRoomsCache = append(publicRoomsCache, pubRooms...) + publicRoomsCache = append(publicRoomsCache, extraRooms...) + publicRoomsCache = dedupeAndShuffle(publicRoomsCache) + + // sort by total joined member count (big to small) + sort.SliceStable(publicRoomsCache, func(i, j int) bool { + return publicRoomsCache[i].JoinedMembersCount > publicRoomsCache[j].JoinedMembersCount + }) + return publicRoomsCache +} + +func getPublicRoomsFromCache() []gomatrixserverlib.PublicRoom { + cacheMu.Lock() + defer cacheMu.Unlock() + return publicRoomsCache +} + +func dedupeAndShuffle(in []gomatrixserverlib.PublicRoom) []gomatrixserverlib.PublicRoom { + // de-duplicate rooms with the same room ID. We can join the room via any of these aliases as we know these servers + // are alive and well, so we arbitrarily pick one (purposefully shuffling them to spread the load a bit) + var publicRooms []gomatrixserverlib.PublicRoom + haveRoomIDs := make(map[string]bool) + rand.Shuffle(len(in), func(i, j int) { + in[i], in[j] = in[j], in[i] + }) + for _, r := range in { + if haveRoomIDs[r.RoomID] { + continue + } + haveRoomIDs[r.RoomID] = true + publicRooms = append(publicRooms, r) + } + return publicRooms +} diff --git a/clientapi/routing/directory_public_test.go b/clientapi/routing/directory_public_test.go index f2a1d551..bb3912b8 100644 --- a/clientapi/routing/directory_public_test.go +++ b/clientapi/routing/directory_public_test.go @@ -3,16 +3,26 @@ package routing import ( "reflect" "testing" + + "github.com/matrix-org/gomatrixserverlib" ) +func pubRoom(name string) gomatrixserverlib.PublicRoom { + return gomatrixserverlib.PublicRoom{ + Name: name, + } +} + func TestSliceInto(t *testing.T) { - slice := []string{"a", "b", "c", "d", "e", "f", "g"} + slice := []gomatrixserverlib.PublicRoom{ + pubRoom("a"), pubRoom("b"), pubRoom("c"), pubRoom("d"), pubRoom("e"), pubRoom("f"), pubRoom("g"), + } limit := int16(3) testCases := []struct { since int64 wantPrev int wantNext int - wantSubset []string + wantSubset []gomatrixserverlib.PublicRoom }{ { since: 0, diff --git a/clientapi/routing/routing.go b/clientapi/routing/routing.go index 85c5c0d9..754fbca8 100644 --- a/clientapi/routing/routing.go +++ b/clientapi/routing/routing.go @@ -61,7 +61,7 @@ func Setup( transactionsCache *transactions.Cache, federationSender federationSenderAPI.FederationSenderInternalAPI, stateAPI currentstateAPI.CurrentStateInternalAPI, - extRoomsProvider api.ExternalPublicRoomsProvider, + extRoomsProvider api.ExtraPublicRoomsProvider, ) { publicAPIMux.Handle("/client/versions", @@ -313,11 +313,7 @@ func Setup( ).Methods(http.MethodPut, http.MethodOptions) r0mux.Handle("/publicRooms", httputil.MakeExternalAPI("public_rooms", func(req *http.Request) util.JSONResponse { - /* TODO: - if extRoomsProvider != nil { - return GetPostPublicRoomsWithExternal(req, stateAPI, fedClient, extRoomsProvider) - } */ - return GetPostPublicRooms(req, rsAPI, stateAPI) + return GetPostPublicRooms(req, rsAPI, stateAPI, extRoomsProvider) }), ).Methods(http.MethodGet, http.MethodPost, http.MethodOptions) diff --git a/cmd/dendrite-demo-libp2p/main.go b/cmd/dendrite-demo-libp2p/main.go index 988f4aa7..dcda76a8 100644 --- a/cmd/dendrite-demo-libp2p/main.go +++ b/cmd/dendrite-demo-libp2p/main.go @@ -1,6 +1,4 @@ -// Copyright 2017 Vector Creations Ltd -// Copyright 2018 New Vector Ltd -// Copyright 2019-2020 The Matrix.org Foundation C.I.C. +// Copyright 2020 The Matrix.org Foundation C.I.C. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -162,13 +160,12 @@ func main() { &base.Base, federation, rsAPI, keyRing, ) rsAPI.SetFederationSenderAPI(fsAPI) - /* TODO: - publicRoomsDB, err := storage.NewPublicRoomsServerDatabaseWithPubSub(string(base.Base.Cfg.Database.PublicRoomsAPI), base.LibP2PPubsub, cfg.Matrix.ServerName) - if err != nil { - logrus.WithError(err).Panicf("failed to connect to public rooms db") - } - */ stateAPI := currentstateserver.NewInternalAPI(base.Base.Cfg, base.Base.KafkaConsumer) + provider := newPublicRoomsProvider(base.LibP2PPubsub, rsAPI, stateAPI) + err = provider.Start() + if err != nil { + panic("failed to create new public rooms provider: " + err.Error()) + } monolith := setup.Monolith{ Config: base.Base.Cfg, @@ -180,13 +177,14 @@ func main() { KafkaConsumer: base.Base.KafkaConsumer, KafkaProducer: base.Base.KafkaProducer, - AppserviceAPI: asAPI, - EDUInternalAPI: eduInputAPI, - FederationSenderAPI: fsAPI, - RoomserverAPI: rsAPI, - ServerKeyAPI: serverKeyAPI, - StateAPI: stateAPI, - UserAPI: userAPI, + AppserviceAPI: asAPI, + EDUInternalAPI: eduInputAPI, + FederationSenderAPI: fsAPI, + RoomserverAPI: rsAPI, + ServerKeyAPI: serverKeyAPI, + StateAPI: stateAPI, + UserAPI: userAPI, + ExtPublicRoomsProvider: provider, } monolith.AddAllPublicRoutes(base.Base.PublicAPIMux) diff --git a/cmd/dendrite-demo-libp2p/publicrooms.go b/cmd/dendrite-demo-libp2p/publicrooms.go new file mode 100644 index 00000000..2160ddef --- /dev/null +++ b/cmd/dendrite-demo-libp2p/publicrooms.go @@ -0,0 +1,156 @@ +// Copyright 2020 The Matrix.org Foundation C.I.C. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package main + +import ( + "context" + "encoding/json" + "fmt" + "sync" + "sync/atomic" + "time" + + currentstateAPI "github.com/matrix-org/dendrite/currentstateserver/api" + roomserverAPI "github.com/matrix-org/dendrite/roomserver/api" + + pubsub "github.com/libp2p/go-libp2p-pubsub" + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" +) + +const MaintenanceInterval = time.Second * 10 + +type discoveredRoom struct { + time time.Time + room gomatrixserverlib.PublicRoom +} + +type publicRoomsProvider struct { + pubsub *pubsub.PubSub + topic *pubsub.Topic + subscription *pubsub.Subscription + foundRooms map[string]discoveredRoom // additional rooms we have learned about from the DHT + foundRoomsMutex sync.RWMutex // protects foundRooms + maintenanceTimer *time.Timer // + roomsAdvertised atomic.Value // stores int + rsAPI roomserverAPI.RoomserverInternalAPI + stateAPI currentstateAPI.CurrentStateInternalAPI +} + +func newPublicRoomsProvider(ps *pubsub.PubSub, rsAPI roomserverAPI.RoomserverInternalAPI, stateAPI currentstateAPI.CurrentStateInternalAPI) *publicRoomsProvider { + return &publicRoomsProvider{ + foundRooms: make(map[string]discoveredRoom), + pubsub: ps, + rsAPI: rsAPI, + stateAPI: stateAPI, + } +} + +func (p *publicRoomsProvider) Start() error { + if topic, err := p.pubsub.Join("/matrix/publicRooms"); err != nil { + return err + } else if sub, err := topic.Subscribe(); err == nil { + p.topic = topic + p.subscription = sub + go p.MaintenanceTimer() + go p.FindRooms() + p.roomsAdvertised.Store(0) + } else { + return err + } + return nil +} + +func (p *publicRoomsProvider) MaintenanceTimer() { + if p.maintenanceTimer != nil && !p.maintenanceTimer.Stop() { + <-p.maintenanceTimer.C + } + p.Interval() +} + +func (p *publicRoomsProvider) Interval() { + p.foundRoomsMutex.Lock() + for k, v := range p.foundRooms { + if time.Since(v.time) > time.Minute { + delete(p.foundRooms, k) + } + } + p.foundRoomsMutex.Unlock() + if err := p.AdvertiseRooms(); err != nil { + fmt.Println("Failed to advertise room in DHT:", err) + } + p.foundRoomsMutex.RLock() + defer p.foundRoomsMutex.RUnlock() + fmt.Println("Found", len(p.foundRooms), "room(s), advertised", p.roomsAdvertised.Load(), "room(s)") + p.maintenanceTimer = time.AfterFunc(MaintenanceInterval, p.Interval) +} + +func (p *publicRoomsProvider) AdvertiseRooms() error { + ctx := context.Background() + var queryRes roomserverAPI.QueryPublishedRoomsResponse + // Query published rooms on our server. This will not invoke clientapi.ExtraPublicRoomsProvider + err := p.rsAPI.QueryPublishedRooms(ctx, &roomserverAPI.QueryPublishedRoomsRequest{}, &queryRes) + if err != nil { + util.GetLogger(ctx).WithError(err).Error("QueryPublishedRooms failed") + return err + } + ourRooms, err := currentstateAPI.PopulatePublicRooms(ctx, queryRes.RoomIDs, p.stateAPI) + if err != nil { + util.GetLogger(ctx).WithError(err).Error("PopulatePublicRooms failed") + return err + } + advertised := 0 + for _, room := range ourRooms { + if j, err := json.Marshal(room); err == nil { + if err := p.topic.Publish(context.TODO(), j); err != nil { + fmt.Println("Failed to publish public room:", err) + } else { + advertised++ + } + } + } + + p.roomsAdvertised.Store(advertised) + return nil +} + +func (p *publicRoomsProvider) FindRooms() { + for { + msg, err := p.subscription.Next(context.Background()) + if err != nil { + continue + } + received := discoveredRoom{ + time: time.Now(), + } + if err := json.Unmarshal(msg.Data, &received.room); err != nil { + fmt.Println("Unmarshal error:", err) + continue + } + fmt.Printf("received %+v \n", received) + p.foundRoomsMutex.Lock() + p.foundRooms[received.room.RoomID] = received + p.foundRoomsMutex.Unlock() + } +} + +func (p *publicRoomsProvider) Rooms() (rooms []gomatrixserverlib.PublicRoom) { + p.foundRoomsMutex.RLock() + defer p.foundRoomsMutex.RUnlock() + for _, dr := range p.foundRooms { + rooms = append(rooms, dr.room) + } + return +} diff --git a/cmd/dendritejs/main.go b/cmd/dendritejs/main.go index 6e2bdafe..1443bc18 100644 --- a/cmd/dendritejs/main.go +++ b/cmd/dendritejs/main.go @@ -211,7 +211,7 @@ func main() { ) fedSenderAPI := federationsender.NewInternalAPI(base, federation, rsAPI, &keyRing) rsAPI.SetFederationSenderAPI(fedSenderAPI) - p2pPublicRoomProvider := NewLibP2PPublicRoomsProvider(node, fedSenderAPI) + p2pPublicRoomProvider := NewLibP2PPublicRoomsProvider(node, fedSenderAPI, federation) stateAPI := currentstateserver.NewInternalAPI(base.Cfg, base.KafkaConsumer) diff --git a/cmd/dendritejs/publicrooms.go b/cmd/dendritejs/publicrooms.go index 5032bc15..a4623ba3 100644 --- a/cmd/dendritejs/publicrooms.go +++ b/cmd/dendritejs/publicrooms.go @@ -18,22 +18,29 @@ package main import ( "context" + "sync" + "time" "github.com/matrix-org/dendrite/federationsender/api" go_http_js_libp2p "github.com/matrix-org/go-http-js-libp2p" "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" ) type libp2pPublicRoomsProvider struct { node *go_http_js_libp2p.P2pLocalNode providers []go_http_js_libp2p.PeerInfo fedSender api.FederationSenderInternalAPI + fedClient *gomatrixserverlib.FederationClient } -func NewLibP2PPublicRoomsProvider(node *go_http_js_libp2p.P2pLocalNode, fedSender api.FederationSenderInternalAPI) *libp2pPublicRoomsProvider { +func NewLibP2PPublicRoomsProvider( + node *go_http_js_libp2p.P2pLocalNode, fedSender api.FederationSenderInternalAPI, fedClient *gomatrixserverlib.FederationClient, +) *libp2pPublicRoomsProvider { p := &libp2pPublicRoomsProvider{ node: node, fedSender: fedSender, + fedClient: fedClient, } node.RegisterFoundProviders(p.foundProviders) return p @@ -62,10 +69,86 @@ func (p *libp2pPublicRoomsProvider) foundProviders(peerInfos []go_http_js_libp2p p.providers = peerInfos } -func (p *libp2pPublicRoomsProvider) Homeservers() []string { +func (p *libp2pPublicRoomsProvider) Rooms() []gomatrixserverlib.PublicRoom { + return bulkFetchPublicRoomsFromServers(context.Background(), p.fedClient, p.homeservers()) +} + +func (p *libp2pPublicRoomsProvider) homeservers() []string { result := make([]string, len(p.providers)) for i := range p.providers { result[i] = p.providers[i].Id } return result } + +// bulkFetchPublicRoomsFromServers fetches public rooms from the list of homeservers. +// Returns a list of public rooms. +func bulkFetchPublicRoomsFromServers( + ctx context.Context, fedClient *gomatrixserverlib.FederationClient, homeservers []string, +) (publicRooms []gomatrixserverlib.PublicRoom) { + limit := 200 + // follow pipeline semantics, see https://blog.golang.org/pipelines for more info. + // goroutines send rooms to this channel + roomCh := make(chan gomatrixserverlib.PublicRoom, int(limit)) + // signalling channel to tell goroutines to stop sending rooms and quit + done := make(chan bool) + // signalling to say when we can close the room channel + var wg sync.WaitGroup + wg.Add(len(homeservers)) + // concurrently query for public rooms + for _, hs := range homeservers { + go func(homeserverDomain string) { + defer wg.Done() + util.GetLogger(ctx).WithField("hs", homeserverDomain).Info("Querying HS for public rooms") + fres, err := fedClient.GetPublicRooms(ctx, gomatrixserverlib.ServerName(homeserverDomain), int(limit), "", false, "") + if err != nil { + util.GetLogger(ctx).WithError(err).WithField("hs", homeserverDomain).Warn( + "bulkFetchPublicRoomsFromServers: failed to query hs", + ) + return + } + for _, room := range fres.Chunk { + // atomically send a room or stop + select { + case roomCh <- room: + case <-done: + util.GetLogger(ctx).WithError(err).WithField("hs", homeserverDomain).Info("Interrupted whilst sending rooms") + return + } + } + }(hs) + } + + // Close the room channel when the goroutines have quit so we don't leak, but don't let it stop the in-flight request. + // This also allows the request to fail fast if all HSes experience errors as it will cause the room channel to be + // closed. + go func() { + wg.Wait() + util.GetLogger(ctx).Info("Cleaning up resources") + close(roomCh) + }() + + // fan-in results with timeout. We stop when we reach the limit. +FanIn: + for len(publicRooms) < int(limit) || limit == 0 { + // add a room or timeout + select { + case room, ok := <-roomCh: + if !ok { + util.GetLogger(ctx).Info("All homeservers have been queried, returning results.") + break FanIn + } + publicRooms = append(publicRooms, room) + case <-time.After(15 * time.Second): // we've waited long enough, let's tell the client what we got. + util.GetLogger(ctx).Info("Waited 15s for federated public rooms, returning early") + break FanIn + case <-ctx.Done(): // the client hung up on us, let's stop. + util.GetLogger(ctx).Info("Client hung up, returning early") + break FanIn + } + } + // tell goroutines to stop + close(done) + + return publicRooms +} diff --git a/currentstateserver/api/wrapper.go b/currentstateserver/api/wrapper.go new file mode 100644 index 00000000..9f7486a0 --- /dev/null +++ b/currentstateserver/api/wrapper.go @@ -0,0 +1,75 @@ +package api + +import ( + "context" + + "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" +) + +// PopulatePublicRooms extracts PublicRoom information for all the provided room IDs. The IDs are not checked to see if they are visible in the +// published room directory. +// due to lots of switches +// nolint:gocyclo +func PopulatePublicRooms(ctx context.Context, roomIDs []string, stateAPI CurrentStateInternalAPI) ([]gomatrixserverlib.PublicRoom, error) { + avatarTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.avatar", StateKey: ""} + nameTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.name", StateKey: ""} + canonicalTuple := gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomCanonicalAlias, StateKey: ""} + topicTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.topic", StateKey: ""} + guestTuple := gomatrixserverlib.StateKeyTuple{EventType: "m.room.guest_access", StateKey: ""} + visibilityTuple := gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomHistoryVisibility, StateKey: ""} + joinRuleTuple := gomatrixserverlib.StateKeyTuple{EventType: gomatrixserverlib.MRoomJoinRules, StateKey: ""} + + var stateRes QueryBulkStateContentResponse + err := stateAPI.QueryBulkStateContent(ctx, &QueryBulkStateContentRequest{ + RoomIDs: roomIDs, + AllowWildcards: true, + StateTuples: []gomatrixserverlib.StateKeyTuple{ + nameTuple, canonicalTuple, topicTuple, guestTuple, visibilityTuple, joinRuleTuple, avatarTuple, + {EventType: gomatrixserverlib.MRoomMember, StateKey: "*"}, + }, + }, &stateRes) + if err != nil { + util.GetLogger(ctx).WithError(err).Error("QueryBulkStateContent failed") + return nil, err + } + chunk := make([]gomatrixserverlib.PublicRoom, len(roomIDs)) + i := 0 + for roomID, data := range stateRes.Rooms { + pub := gomatrixserverlib.PublicRoom{ + RoomID: roomID, + } + joinCount := 0 + var joinRule, guestAccess string + for tuple, contentVal := range data { + if tuple.EventType == gomatrixserverlib.MRoomMember && contentVal == "join" { + joinCount++ + continue + } + switch tuple { + case avatarTuple: + pub.AvatarURL = contentVal + case nameTuple: + pub.Name = contentVal + case topicTuple: + pub.Topic = contentVal + case canonicalTuple: + pub.CanonicalAlias = contentVal + case visibilityTuple: + pub.WorldReadable = contentVal == "world_readable" + // need both of these to determine whether guests can join + case joinRuleTuple: + joinRule = contentVal + case guestTuple: + guestAccess = contentVal + } + } + if joinRule == gomatrixserverlib.Public && guestAccess == "can_join" { + pub.GuestCanJoin = true + } + pub.JoinedMembersCount = joinCount + chunk[i] = pub + i++ + } + return chunk, nil +} diff --git a/internal/setup/monolith.go b/internal/setup/monolith.go index ca90986e..6ace53a9 100644 --- a/internal/setup/monolith.go +++ b/internal/setup/monolith.go @@ -58,7 +58,7 @@ type Monolith struct { StateAPI currentstateAPI.CurrentStateInternalAPI // Optional - ExtPublicRoomsProvider api.ExternalPublicRoomsProvider + ExtPublicRoomsProvider api.ExtraPublicRoomsProvider } // AddAllPublicRoutes attaches all public paths to the given router