diff --git a/build/gobind/monolith.go b/build/gobind/monolith.go index d220fabc..bc638d00 100644 --- a/build/gobind/monolith.go +++ b/build/gobind/monolith.go @@ -3,6 +3,7 @@ package gobind import ( "context" "crypto/tls" + "encoding/hex" "fmt" "net" "net/http" @@ -25,12 +26,17 @@ import ( "github.com/matrix-org/gomatrixserverlib" "github.com/sirupsen/logrus" "github.com/yggdrasil-network/yggdrasil-go/src/crypto" + "go.uber.org/atomic" ) type DendriteMonolith struct { + logger logrus.Logger YggdrasilNode *yggconn.Node StorageDirectory string listener net.Listener + httpServer *http.Server + httpListening atomic.Bool + yggListening atomic.Bool } func (m *DendriteMonolith) BaseURL() string { @@ -58,9 +64,10 @@ func (m *DendriteMonolith) DisconnectMulticastPeers() { } func (m *DendriteMonolith) Start() { - logger := logrus.Logger{ + m.logger = logrus.Logger{ Out: BindLogger{}, } + m.logger.SetOutput(BindLogger{}) logrus.SetOutput(BindLogger{}) var err error @@ -162,38 +169,39 @@ func (m *DendriteMonolith) Start() { base.UseHTTPAPIs, ) - ygg.NotifySessionNew(func(boxPubKey crypto.BoxPubKey) { - serv := gomatrixserverlib.ServerName(boxPubKey.String()) + ygg.NewSession = func(serverName gomatrixserverlib.ServerName) { + logrus.Infof("Found new session %q", serverName) + time.Sleep(time.Second * 3) req := &api.PerformServersAliveRequest{ - Servers: []gomatrixserverlib.ServerName{serv}, + Servers: []gomatrixserverlib.ServerName{serverName}, } res := &api.PerformServersAliveResponse{} if err := fsAPI.PerformServersAlive(context.TODO(), req, res); err != nil { - logrus.WithError(err).Warnf("Failed to notify server %q alive due to new session", serv) - } else { - logrus.Infof("Notified server %q alive due to new session", serv) + logrus.WithError(err).Warn("Failed to notify server alive due to new session") } - }) + } - ygg.NotifyLinkNew(func(boxPubKey crypto.BoxPubKey, linkType, remote string) { - serv := gomatrixserverlib.ServerName(boxPubKey.String()) + ygg.NotifyLinkNew(func(_ crypto.BoxPubKey, sigPubKey crypto.SigPubKey, linkType, remote string) { + serverName := hex.EncodeToString(sigPubKey[:]) + logrus.Infof("Found new peer %q", serverName) + time.Sleep(time.Second * 3) req := &api.PerformServersAliveRequest{ - Servers: []gomatrixserverlib.ServerName{serv}, + Servers: []gomatrixserverlib.ServerName{ + gomatrixserverlib.ServerName(serverName), + }, } res := &api.PerformServersAliveResponse{} if err := fsAPI.PerformServersAlive(context.TODO(), req, res); err != nil { - logrus.WithError(err).Warnf("Failed to notify server %q alive due to new peer", serv) - } else { - logrus.Infof("Notified server %q alive due to new peer", serv) + logrus.WithError(err).Warn("Failed to notify server alive due to new session") } }) // Build both ends of a HTTP multiplex. - httpServer := &http.Server{ + m.httpServer = &http.Server{ Addr: ":0", TLSNextProto: map[string]func(*http.Server, *tls.Conn, http.Handler){}, - ReadTimeout: 15 * time.Second, - WriteTimeout: 45 * time.Second, + ReadTimeout: 30 * time.Second, + WriteTimeout: 30 * time.Second, IdleTimeout: 60 * time.Second, BaseContext: func(_ net.Listener) context.Context { return context.Background() @@ -201,19 +209,33 @@ func (m *DendriteMonolith) Start() { Handler: base.BaseMux, } - go func() { - logger.Info("Listening on ", ygg.DerivedServerName()) - logger.Fatal(httpServer.Serve(ygg)) - }() - go func() { - logger.Info("Listening on ", m.BaseURL()) - logger.Fatal(httpServer.Serve(m.listener)) - }() + m.Resume() } -func (m *DendriteMonolith) Stop() { - if err := m.listener.Close(); err != nil { - logrus.Warn("Error stopping listener:", err) +func (m *DendriteMonolith) Resume() { + logrus.Info("Resuming monolith") + if listener, err := net.Listen("tcp", "localhost:65432"); err == nil { + m.listener = listener + } + if m.yggListening.CAS(false, true) { + go func() { + m.logger.Info("Listening on ", m.YggdrasilNode.DerivedServerName()) + m.logger.Fatal(m.httpServer.Serve(m.YggdrasilNode)) + m.yggListening.Store(false) + }() + } + if m.httpListening.CAS(false, true) { + go func() { + m.logger.Info("Listening on ", m.BaseURL()) + m.logger.Fatal(m.httpServer.Serve(m.listener)) + m.httpListening.Store(false) + }() + } +} + +func (m *DendriteMonolith) Suspend() { + m.logger.Info("Suspending monolith") + if err := m.httpServer.Close(); err != nil { + m.logger.Warn("Error stopping HTTP server:", err) } - m.YggdrasilNode.Stop() } diff --git a/cmd/dendrite-demo-yggdrasil/README.md b/cmd/dendrite-demo-yggdrasil/README.md new file mode 100644 index 00000000..148b9a58 --- /dev/null +++ b/cmd/dendrite-demo-yggdrasil/README.md @@ -0,0 +1,22 @@ +# Yggdrasil Demo + +This is the Dendrite Yggdrasil demo! It's easy to get started - all you need is Go 1.13 or later. + +To run the homeserver, start at the root of the Dendrite repository and run: + +``` +go run ./cmd/dendrite-demo-yggdrasil +``` + +The following command line arguments are accepted: + +* `-peer tcp://a.b.c.d:e` to specify a static Yggdrasil peer to connect to - you will need to supply this if you do not have another Yggdrasil node on your network +* `-port 12345` to specify a port to listen on for client connections + +If you need to find an internet peer, take a look at [this list](https://publicpeers.neilalexander.dev/). + +Then point your favourite Matrix client to the homeserver URL`http://localhost:8008` (or whichever `-port` you specified), create an account and log in. + +If your peering connection is operational then you should see a `Connected TCP:` line in the log output. If not then try a different peer. + +Once logged in, you should be able to open the room directory or join a room by its ID. \ No newline at end of file diff --git a/cmd/dendrite-demo-yggdrasil/main.go b/cmd/dendrite-demo-yggdrasil/main.go index 65747756..122d0266 100644 --- a/cmd/dendrite-demo-yggdrasil/main.go +++ b/cmd/dendrite-demo-yggdrasil/main.go @@ -17,6 +17,7 @@ package main import ( "context" "crypto/tls" + "encoding/hex" "flag" "fmt" "net" @@ -154,27 +155,28 @@ func main() { base.UseHTTPAPIs, ) - ygg.NotifySessionNew(func(boxPubKey crypto.BoxPubKey) { + ygg.NewSession = func(serverName gomatrixserverlib.ServerName) { + logrus.Infof("Found new session %q", serverName) req := &api.PerformServersAliveRequest{ - Servers: []gomatrixserverlib.ServerName{ - gomatrixserverlib.ServerName(boxPubKey.String()), - }, + Servers: []gomatrixserverlib.ServerName{serverName}, } res := &api.PerformServersAliveResponse{} if err := fsAPI.PerformServersAlive(context.TODO(), req, res); err != nil { logrus.WithError(err).Warn("Failed to notify server alive due to new session") } - }) + } - ygg.NotifyLinkNew(func(boxPubKey crypto.BoxPubKey, linkType, remote string) { + ygg.NotifyLinkNew(func(_ crypto.BoxPubKey, sigPubKey crypto.SigPubKey, linkType, remote string) { + serverName := hex.EncodeToString(sigPubKey[:]) + logrus.Infof("Found new peer %q", serverName) req := &api.PerformServersAliveRequest{ Servers: []gomatrixserverlib.ServerName{ - gomatrixserverlib.ServerName(boxPubKey.String()), + gomatrixserverlib.ServerName(serverName), }, } res := &api.PerformServersAliveResponse{} if err := fsAPI.PerformServersAlive(context.TODO(), req, res); err != nil { - logrus.WithError(err).Warn("Failed to notify server alive due to new link") + logrus.WithError(err).Warn("Failed to notify server alive due to new session") } }) diff --git a/cmd/dendrite-demo-yggdrasil/yggconn/client.go b/cmd/dendrite-demo-yggdrasil/yggconn/client.go index 399993e3..b74468db 100644 --- a/cmd/dendrite-demo-yggdrasil/yggconn/client.go +++ b/cmd/dendrite-demo-yggdrasil/yggconn/client.go @@ -46,7 +46,8 @@ func (n *Node) CreateClient( tr.RegisterProtocol( "matrix", &yggroundtripper{ inner: &http.Transport{ - ResponseHeaderTimeout: 15 * time.Second, + TLSHandshakeTimeout: 20 * time.Second, + ResponseHeaderTimeout: 10 * time.Second, IdleConnTimeout: 60 * time.Second, DialContext: n.yggdialerctx, }, @@ -62,7 +63,8 @@ func (n *Node) CreateFederationClient( tr.RegisterProtocol( "matrix", &yggroundtripper{ inner: &http.Transport{ - ResponseHeaderTimeout: 15 * time.Second, + TLSHandshakeTimeout: 20 * time.Second, + ResponseHeaderTimeout: 10 * time.Second, IdleConnTimeout: 60 * time.Second, DialContext: n.yggdialerctx, }, diff --git a/cmd/dendrite-demo-yggdrasil/yggconn/node.go b/cmd/dendrite-demo-yggdrasil/yggconn/node.go index 89fb69b5..2bc300c8 100644 --- a/cmd/dendrite-demo-yggdrasil/yggconn/node.go +++ b/cmd/dendrite-demo-yggdrasil/yggconn/node.go @@ -55,6 +55,7 @@ type Node struct { quicConfig *quic.Config sessions sync.Map // string -> quic.Session incoming chan QUICStream + NewSession func(remote gomatrixserverlib.ServerName) } func (n *Node) BuildName() string { @@ -137,7 +138,7 @@ func Setup(instanceName, storageDirectory string) (*Node, error) { MaxIncomingStreams: 0, MaxIncomingUniStreams: 0, KeepAlive: true, - MaxIdleTimeout: time.Second * 60, + MaxIdleTimeout: time.Minute * 15, HandshakeTimeout: time.Second * 15, } @@ -189,7 +190,9 @@ func (n *Node) PeerCount() int { } func (n *Node) KnownNodes() []gomatrixserverlib.ServerName { - nodemap := map[string]struct{}{} + nodemap := map[string]struct{}{ + "b5ae50589e50991dd9dd7d59c5c5f7a4521e8da5b603b7f57076272abc58b374": struct{}{}, + } for _, peer := range n.core.GetSwitchPeers() { nodemap[hex.EncodeToString(peer.SigningKey[:])] = struct{}{} } @@ -264,18 +267,10 @@ func (n *Node) SetStaticPeer(uri string) error { return nil } -func (n *Node) NotifyLinkNew(f func(boxPubKey crypto.BoxPubKey, linkType, remote string)) { +func (n *Node) NotifyLinkNew(f func(boxPubKey crypto.BoxPubKey, sigPubKey crypto.SigPubKey, linkType, remote string)) { n.core.NotifyLinkNew(f) } -func (n *Node) NotifyLinkGone(f func(boxPubKey crypto.BoxPubKey, linkType, remote string)) { +func (n *Node) NotifyLinkGone(f func(boxPubKey crypto.BoxPubKey, sigPubKey crypto.SigPubKey, linkType, remote string)) { n.core.NotifyLinkGone(f) } - -func (n *Node) NotifySessionNew(f func(boxPubKey crypto.BoxPubKey)) { - n.core.NotifySessionNew(f) -} - -func (n *Node) NotifySessionGone(f func(boxPubKey crypto.BoxPubKey)) { - n.core.NotifySessionGone(f) -} diff --git a/cmd/dendrite-demo-yggdrasil/yggconn/session.go b/cmd/dendrite-demo-yggdrasil/yggconn/session.go index 01cec813..ff77e64f 100644 --- a/cmd/dendrite-demo-yggdrasil/yggconn/session.go +++ b/cmd/dendrite-demo-yggdrasil/yggconn/session.go @@ -29,6 +29,7 @@ import ( "time" "github.com/lucas-clemente/quic-go" + "github.com/matrix-org/gomatrixserverlib" "github.com/yggdrasil-network/yggdrasil-go/src/crypto" ) @@ -56,6 +57,12 @@ func (n *Node) listenFromYgg() { func (n *Node) listenFromQUIC(session quic.Session) { n.sessions.Store(session.RemoteAddr().String(), session) defer n.sessions.Delete(session.RemoteAddr()) + if n.NewSession != nil { + if len(session.ConnectionState().PeerCertificates) == 1 { + subjectName := session.ConnectionState().PeerCertificates[0].Subject.CommonName + go n.NewSession(gomatrixserverlib.ServerName(subjectName)) + } + } for { st, err := session.AcceptStream(context.TODO()) if err != nil { diff --git a/federationsender/queue/destinationqueue.go b/federationsender/queue/destinationqueue.go index 82cb343f..57c8cff6 100644 --- a/federationsender/queue/destinationqueue.go +++ b/federationsender/queue/destinationqueue.go @@ -256,7 +256,10 @@ func (oq *destinationQueue) backgroundSend() { // PDUs waiting to be sent. By sending a message into the wake chan, // the next loop iteration will try processing these PDUs again, // subject to the backoff. - oq.notifyPDUs <- true + select { + case oq.notifyPDUs <- true: + default: + } } } else if transaction { // If we successfully sent the transaction then clear out @@ -384,7 +387,7 @@ func (oq *destinationQueue) nextTransaction( // TODO: we should check for 500-ish fails vs 400-ish here, // since we shouldn't queue things indefinitely in response // to a 400-ish error - ctx, cancel = context.WithTimeout(context.Background(), time.Second*15) + ctx, cancel = context.WithTimeout(context.Background(), time.Second*30) defer cancel() _, err = oq.client.SendTransaction(ctx, t) switch err.(type) { diff --git a/federationsender/storage/sqlite3/storage.go b/federationsender/storage/sqlite3/storage.go index 7fe6b65b..1a4715bf 100644 --- a/federationsender/storage/sqlite3/storage.go +++ b/federationsender/storage/sqlite3/storage.go @@ -239,39 +239,37 @@ func (d *Database) CleanTransactionPDUs( serverName gomatrixserverlib.ServerName, transactionID gomatrixserverlib.TransactionID, ) error { - var err error - var nids []int64 var deleteNIDs []int64 + nids, err := d.selectQueuePDUs(ctx, nil, serverName, transactionID, 50) + if err != nil { + return fmt.Errorf("d.selectQueuePDUs: %w", err) + } if err = d.queuePDUsWriter.Do(d.db, func(txn *sql.Tx) error { - nids, err = d.selectQueuePDUs(ctx, txn, serverName, transactionID, 50) - if err != nil { - return fmt.Errorf("d.selectQueuePDUs: %w", err) - } if err = d.deleteQueueTransaction(ctx, txn, serverName, transactionID); err != nil { return fmt.Errorf("d.deleteQueueTransaction: %w", err) } - var count int64 - for _, nid := range nids { - count, err = d.selectQueueReferenceJSONCount(ctx, txn, nid) - if err != nil { - return fmt.Errorf("d.selectQueueReferenceJSONCount: %w", err) - } - if count == 0 { - deleteNIDs = append(deleteNIDs, nid) - } - } return nil }); err != nil { return err } - err = d.queueJSONWriter.Do(d.db, func(txn *sql.Tx) error { - if len(deleteNIDs) > 0 { + var count int64 + for _, nid := range nids { + count, err = d.selectQueueReferenceJSONCount(ctx, nil, nid) + if err != nil { + return fmt.Errorf("d.selectQueueReferenceJSONCount: %w", err) + } + if count == 0 { + deleteNIDs = append(deleteNIDs, nid) + } + } + if len(deleteNIDs) > 0 { + err = d.queueJSONWriter.Do(d.db, func(txn *sql.Tx) error { if err = d.deleteQueueJSON(ctx, txn, deleteNIDs); err != nil { return fmt.Errorf("d.deleteQueueJSON: %w", err) } - } - return nil - }) + return nil + }) + } return err } diff --git a/go.mod b/go.mod index 5c896a37..2c348d94 100644 --- a/go.mod +++ b/go.mod @@ -36,7 +36,7 @@ require ( github.com/uber-go/atomic v1.3.0 // indirect github.com/uber/jaeger-client-go v2.15.0+incompatible github.com/uber/jaeger-lib v1.5.0 - github.com/yggdrasil-network/yggdrasil-go v0.3.15-0.20200708124809-79077e271c6d + github.com/yggdrasil-network/yggdrasil-go v0.3.15-0.20200709151813-3c2f73ac5e86 go.uber.org/atomic v1.4.0 golang.org/x/crypto v0.0.0-20200423211502-4bdfaf469ed5 golang.org/x/mobile v0.0.0-20200629153529-33b80540585f // indirect diff --git a/go.sum b/go.sum index 1fd01ce9..e05e69c5 100644 --- a/go.sum +++ b/go.sum @@ -663,6 +663,8 @@ github.com/yggdrasil-network/yggdrasil-go v0.3.15-0.20200708123331-4e0b0e723459 github.com/yggdrasil-network/yggdrasil-go v0.3.15-0.20200708123331-4e0b0e723459/go.mod h1:d+Nz6SPeG6kmeSPFL0cvfWfgwEql75fUnZiAONgvyBE= github.com/yggdrasil-network/yggdrasil-go v0.3.15-0.20200708124809-79077e271c6d h1:ly327dysc3r7lfG+AKJWPSAQmGf4h++fk+Y2dD8nDV4= github.com/yggdrasil-network/yggdrasil-go v0.3.15-0.20200708124809-79077e271c6d/go.mod h1:d+Nz6SPeG6kmeSPFL0cvfWfgwEql75fUnZiAONgvyBE= +github.com/yggdrasil-network/yggdrasil-go v0.3.15-0.20200709151813-3c2f73ac5e86 h1:l1zL1Cu/oi8MaBfcKHz4aMdSF5OWOT82SL6y5qP2law= +github.com/yggdrasil-network/yggdrasil-go v0.3.15-0.20200709151813-3c2f73ac5e86/go.mod h1:d+Nz6SPeG6kmeSPFL0cvfWfgwEql75fUnZiAONgvyBE= go.opencensus.io v0.18.0/go.mod h1:vKdFvxhtzZ9onBp9VKHK8z/sRpBMnKAsufL7wlDrCOA= go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU= go.opencensus.io v0.22.1/go.mod h1:Ap50jQcDJrx6rB6VgeeFPtuPIf3wMRvRfrfYDO6+BmA=