73d2f59e30
* Add libp2p-go * Some tweaks, tidying up (cherry picked from commit 1a5bb121f8121c4f68a27abbf25a9a35a1b7c63e) * Move p2p dockerfile (cherry picked from commit 8d3bf44ea1bf37f950034e73bcdc315afdabe79a) * Remove containsBackwardsExtremity * Fix some linter errors, update some libp2p packages/calls, other tidying up * Add -port for dendrite-p2p-demo * Use instance name as key ID * Remove P2P demo docker stuff, no longer needed now that we have SQLite * Remove Dockerfile-p2p too * Remove p2p logic from dendrite-monolith-server * Inject publicRoomsDB in publicroomsapi Inject publicRoomsDB instead of switching on base.libP2P. See: https://github.com/matrix-org/dendrite/pull/956/files?file-filters%5B%5D=.go#r406276914 * Fix lint warning * Extract mDNSListener from base.go * Extract CreateFederationClient into demo * Create P2PDendrite from BaseDendrite Extract logic specific to P2PDendrite from base.go * Set base.go to upstream/master * Move pubsub to demo cmd * Move PostgreswithDHT to cmd * Remove unstable features * Add copyrights * Move libp2pvalidator into p2pdendrite * Rename dendrite-p2p-demo -> dendrite-demo-libp2p * Update copyrights * go mod tidy Co-authored-by: Neil Alexander <neilalexander@users.noreply.github.com>
179 lines
5.8 KiB
Go
179 lines
5.8 KiB
Go
// Copyright 2020 The Matrix.org Foundation C.I.C.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package postgreswithpubsub
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/matrix-org/dendrite/publicroomsapi/storage/postgres"
|
|
"github.com/matrix-org/gomatrixserverlib"
|
|
|
|
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
|
)
|
|
|
|
const MaintenanceInterval = time.Second * 10
|
|
|
|
type discoveredRoom struct {
|
|
time time.Time
|
|
room gomatrixserverlib.PublicRoom
|
|
}
|
|
|
|
// PublicRoomsServerDatabase represents a public rooms server database.
|
|
type PublicRoomsServerDatabase struct {
|
|
postgres.PublicRoomsServerDatabase //
|
|
pubsub *pubsub.PubSub //
|
|
subscription *pubsub.Subscription //
|
|
foundRooms map[string]discoveredRoom // additional rooms we have learned about from the DHT
|
|
foundRoomsMutex sync.RWMutex // protects foundRooms
|
|
maintenanceTimer *time.Timer //
|
|
roomsAdvertised atomic.Value // stores int
|
|
}
|
|
|
|
// NewPublicRoomsServerDatabase creates a new public rooms server database.
|
|
func NewPublicRoomsServerDatabase(dataSourceName string, pubsub *pubsub.PubSub) (*PublicRoomsServerDatabase, error) {
|
|
pg, err := postgres.NewPublicRoomsServerDatabase(dataSourceName)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
provider := PublicRoomsServerDatabase{
|
|
pubsub: pubsub,
|
|
PublicRoomsServerDatabase: *pg,
|
|
foundRooms: make(map[string]discoveredRoom),
|
|
}
|
|
if topic, err := pubsub.Join("/matrix/publicRooms"); err != nil {
|
|
return nil, err
|
|
} else if sub, err := topic.Subscribe(); err == nil {
|
|
provider.subscription = sub
|
|
go provider.MaintenanceTimer()
|
|
go provider.FindRooms()
|
|
provider.roomsAdvertised.Store(0)
|
|
return &provider, nil
|
|
} else {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
func (d *PublicRoomsServerDatabase) GetRoomVisibility(ctx context.Context, roomID string) (bool, error) {
|
|
return d.PublicRoomsServerDatabase.GetRoomVisibility(ctx, roomID)
|
|
}
|
|
|
|
func (d *PublicRoomsServerDatabase) SetRoomVisibility(ctx context.Context, visible bool, roomID string) error {
|
|
d.MaintenanceTimer()
|
|
return d.PublicRoomsServerDatabase.SetRoomVisibility(ctx, visible, roomID)
|
|
}
|
|
|
|
func (d *PublicRoomsServerDatabase) CountPublicRooms(ctx context.Context) (int64, error) {
|
|
d.foundRoomsMutex.RLock()
|
|
defer d.foundRoomsMutex.RUnlock()
|
|
return int64(len(d.foundRooms)), nil
|
|
}
|
|
|
|
func (d *PublicRoomsServerDatabase) GetPublicRooms(ctx context.Context, offset int64, limit int16, filter string) ([]gomatrixserverlib.PublicRoom, error) {
|
|
var rooms []gomatrixserverlib.PublicRoom
|
|
if filter == "__local__" {
|
|
if r, err := d.PublicRoomsServerDatabase.GetPublicRooms(ctx, offset, limit, ""); err == nil {
|
|
rooms = append(rooms, r...)
|
|
} else {
|
|
return []gomatrixserverlib.PublicRoom{}, err
|
|
}
|
|
} else {
|
|
d.foundRoomsMutex.RLock()
|
|
defer d.foundRoomsMutex.RUnlock()
|
|
for _, room := range d.foundRooms {
|
|
rooms = append(rooms, room.room)
|
|
}
|
|
}
|
|
return rooms, nil
|
|
}
|
|
|
|
func (d *PublicRoomsServerDatabase) UpdateRoomFromEvents(ctx context.Context, eventsToAdd []gomatrixserverlib.Event, eventsToRemove []gomatrixserverlib.Event) error {
|
|
return d.PublicRoomsServerDatabase.UpdateRoomFromEvents(ctx, eventsToAdd, eventsToRemove)
|
|
}
|
|
|
|
func (d *PublicRoomsServerDatabase) UpdateRoomFromEvent(ctx context.Context, event gomatrixserverlib.Event) error {
|
|
return d.PublicRoomsServerDatabase.UpdateRoomFromEvent(ctx, event)
|
|
}
|
|
|
|
func (d *PublicRoomsServerDatabase) MaintenanceTimer() {
|
|
if d.maintenanceTimer != nil && !d.maintenanceTimer.Stop() {
|
|
<-d.maintenanceTimer.C
|
|
}
|
|
d.Interval()
|
|
}
|
|
|
|
func (d *PublicRoomsServerDatabase) Interval() {
|
|
d.foundRoomsMutex.Lock()
|
|
for k, v := range d.foundRooms {
|
|
if time.Since(v.time) > time.Minute {
|
|
delete(d.foundRooms, k)
|
|
}
|
|
}
|
|
d.foundRoomsMutex.Unlock()
|
|
if err := d.AdvertiseRooms(); err != nil {
|
|
fmt.Println("Failed to advertise room in DHT:", err)
|
|
}
|
|
d.foundRoomsMutex.RLock()
|
|
defer d.foundRoomsMutex.RUnlock()
|
|
fmt.Println("Found", len(d.foundRooms), "room(s), advertised", d.roomsAdvertised.Load(), "room(s)")
|
|
d.maintenanceTimer = time.AfterFunc(MaintenanceInterval, d.Interval)
|
|
}
|
|
|
|
func (d *PublicRoomsServerDatabase) AdvertiseRooms() error {
|
|
dbCtx, dbCancel := context.WithTimeout(context.Background(), 3*time.Second)
|
|
_ = dbCancel
|
|
ourRooms, err := d.GetPublicRooms(dbCtx, 0, 1024, "__local__")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
advertised := 0
|
|
for _, room := range ourRooms {
|
|
if j, err := json.Marshal(room); err == nil {
|
|
if topic, err := d.pubsub.Join("/matrix/publicRooms"); err != nil {
|
|
fmt.Println("Failed to subscribe to topic:", err)
|
|
} else if err := topic.Publish(context.TODO(), j); err != nil {
|
|
fmt.Println("Failed to publish public room:", err)
|
|
} else {
|
|
advertised++
|
|
}
|
|
}
|
|
}
|
|
|
|
d.roomsAdvertised.Store(advertised)
|
|
return nil
|
|
}
|
|
|
|
func (d *PublicRoomsServerDatabase) FindRooms() {
|
|
for {
|
|
msg, err := d.subscription.Next(context.Background())
|
|
if err != nil {
|
|
continue
|
|
}
|
|
received := discoveredRoom{
|
|
time: time.Now(),
|
|
}
|
|
if err := json.Unmarshal(msg.Data, &received.room); err != nil {
|
|
fmt.Println("Unmarshal error:", err)
|
|
continue
|
|
}
|
|
d.foundRoomsMutex.Lock()
|
|
d.foundRooms[received.room.RoomID] = received
|
|
d.foundRoomsMutex.Unlock()
|
|
}
|
|
}
|