From 07c821daecd023a085224552b899661664201086 Mon Sep 17 00:00:00 2001 From: Robert Swain Date: Wed, 31 May 2017 17:41:42 +0200 Subject: [PATCH 01/10] mediaapi/writers: Fix InternalServerError responses --- .../dendrite/mediaapi/writers/download.go | 36 +++++++------------ .../dendrite/mediaapi/writers/upload.go | 6 ++-- 2 files changed, 14 insertions(+), 28 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/mediaapi/writers/download.go b/src/github.com/matrix-org/dendrite/mediaapi/writers/download.go index 9f66409c..024f755b 100644 --- a/src/github.com/matrix-org/dendrite/mediaapi/writers/download.go +++ b/src/github.com/matrix-org/dendrite/mediaapi/writers/download.go @@ -119,10 +119,8 @@ func (r *downloadRequest) doDownload(w http.ResponseWriter, cfg *config.MediaAPI mediaMetadata, err := db.GetMediaMetadata(r.MediaMetadata.MediaID, r.MediaMetadata.Origin) if err != nil { r.Logger.WithError(err).Error("Error querying the database.") - return &util.JSONResponse{ - Code: 500, - JSON: jsonerror.InternalServerError(), - } + resErr := jsonerror.InternalServerError() + return &resErr } if mediaMetadata == nil { if r.MediaMetadata.Origin == cfg.ServerName { @@ -149,27 +147,21 @@ func (r *downloadRequest) respondFromLocalFile(w http.ResponseWriter, absBasePat filePath, err := fileutils.GetPathFromBase64Hash(r.MediaMetadata.Base64Hash, absBasePath) if err != nil { r.Logger.WithError(err).Error("Failed to get file path from metadata") - return &util.JSONResponse{ - Code: 500, - JSON: jsonerror.InternalServerError(), - } + resErr := jsonerror.InternalServerError() + return &resErr } file, err := os.Open(filePath) defer file.Close() if err != nil { r.Logger.WithError(err).Error("Failed to open file") - return &util.JSONResponse{ - Code: 500, - JSON: jsonerror.InternalServerError(), - } + resErr := jsonerror.InternalServerError() + return &resErr } stat, err := file.Stat() if err != nil { r.Logger.WithError(err).Error("Failed to stat file") - return &util.JSONResponse{ - Code: 500, - JSON: jsonerror.InternalServerError(), - } + resErr := jsonerror.InternalServerError() + return &resErr } if r.MediaMetadata.FileSizeBytes > 0 && int64(r.MediaMetadata.FileSizeBytes) != stat.Size() { @@ -177,10 +169,8 @@ func (r *downloadRequest) respondFromLocalFile(w http.ResponseWriter, absBasePat "fileSizeDatabase": r.MediaMetadata.FileSizeBytes, "fileSizeDisk": stat.Size(), }).Warn("File size in database and on-disk differ.") - return &util.JSONResponse{ - Code: 500, - JSON: jsonerror.InternalServerError(), - } + resErr := jsonerror.InternalServerError() + return &resErr } r.Logger.WithFields(log.Fields{ @@ -202,10 +192,8 @@ func (r *downloadRequest) respondFromLocalFile(w http.ResponseWriter, absBasePat if bytesResponded, err := io.Copy(w, file); err != nil { r.Logger.WithError(err).Warn("Failed to copy from cache") if bytesResponded == 0 { - return &util.JSONResponse{ - Code: 500, - JSON: jsonerror.NotFound(fmt.Sprintf("Failed to respond with file with media ID %q", r.MediaMetadata.MediaID)), - } + resErr := jsonerror.InternalServerError() + return &resErr } // If we have written any data then we have already responded with 200 OK and all we can do is close the connection return nil diff --git a/src/github.com/matrix-org/dendrite/mediaapi/writers/upload.go b/src/github.com/matrix-org/dendrite/mediaapi/writers/upload.go index dabd5007..f1838a55 100644 --- a/src/github.com/matrix-org/dendrite/mediaapi/writers/upload.go +++ b/src/github.com/matrix-org/dendrite/mediaapi/writers/upload.go @@ -136,10 +136,8 @@ func (r *uploadRequest) doUpload(reqReader io.Reader, cfg *config.MediaAPI, db * mediaMetadata, err := db.GetMediaMetadata(r.MediaMetadata.MediaID, r.MediaMetadata.Origin) if err != nil { r.Logger.WithError(err).Error("Error querying the database.") - return &util.JSONResponse{ - Code: 500, - JSON: jsonerror.InternalServerError(), - } + resErr := jsonerror.InternalServerError() + return &resErr } if mediaMetadata != nil { From 923013aa5598e057fdcf64edc8a553bda590160a Mon Sep 17 00:00:00 2001 From: Robert Swain Date: Wed, 31 May 2017 17:56:11 +0200 Subject: [PATCH 02/10] mediaapi: Add support for downloading of remote files --- .../dendrite/mediaapi/routing/routing.go | 3 +- .../dendrite/mediaapi/types/types.go | 10 +- .../dendrite/mediaapi/writers/download.go | 324 +++++++++++++++++- 3 files changed, 329 insertions(+), 8 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/mediaapi/routing/routing.go b/src/github.com/matrix-org/dendrite/mediaapi/routing/routing.go index 0c1dce6f..7641109c 100644 --- a/src/github.com/matrix-org/dendrite/mediaapi/routing/routing.go +++ b/src/github.com/matrix-org/dendrite/mediaapi/routing/routing.go @@ -16,7 +16,6 @@ package routing import ( "net/http" - "sync" "github.com/gorilla/mux" "github.com/matrix-org/dendrite/common" @@ -42,7 +41,7 @@ func Setup(servMux *http.ServeMux, httpClient *http.Client, cfg *config.MediaAPI })) activeRemoteRequests := &types.ActiveRemoteRequests{ - MXCToCond: map[string]*sync.Cond{}, + MXCToResult: map[string]*types.RemoteRequestResult{}, } r0mux.Handle("/download/{serverName}/{mediaId}", prometheus.InstrumentHandler("download", http.HandlerFunc(func(w http.ResponseWriter, req *http.Request) { diff --git a/src/github.com/matrix-org/dendrite/mediaapi/types/types.go b/src/github.com/matrix-org/dendrite/mediaapi/types/types.go index ac18f5fe..82cc1d7c 100644 --- a/src/github.com/matrix-org/dendrite/mediaapi/types/types.go +++ b/src/github.com/matrix-org/dendrite/mediaapi/types/types.go @@ -59,10 +59,18 @@ type MediaMetadata struct { UserID MatrixUserID } +// RemoteRequestResult is used for broadcasting the result of a request for a remote file to routines waiting on the condition +type RemoteRequestResult struct { + // Condition used for the requester to signal the result to all other routines waiting on this condition + Cond *sync.Cond + // Resulting HTTP status code from the request + Result int +} + // ActiveRemoteRequests is a lockable map of media URIs requested from remote homeservers // It is used for ensuring multiple requests for the same file do not clobber each other. type ActiveRemoteRequests struct { sync.Mutex // The string key is an mxc:// URL - MXCToCond map[string]*sync.Cond + MXCToResult map[string]*RemoteRequestResult } diff --git a/src/github.com/matrix-org/dendrite/mediaapi/writers/download.go b/src/github.com/matrix-org/dendrite/mediaapi/writers/download.go index 024f755b..35eca6b6 100644 --- a/src/github.com/matrix-org/dendrite/mediaapi/writers/download.go +++ b/src/github.com/matrix-org/dendrite/mediaapi/writers/download.go @@ -18,10 +18,15 @@ import ( "encoding/json" "fmt" "io" + "net" "net/http" + "net/url" "os" + "path/filepath" "regexp" "strconv" + "strings" + "sync" log "github.com/Sirupsen/logrus" "github.com/matrix-org/dendrite/clientapi/jsonerror" @@ -47,6 +52,10 @@ type downloadRequest struct { // Download implements /download // Files from this server (i.e. origin == cfg.ServerName) are served directly +// Files from remote servers (i.e. origin != cfg.ServerName) are cached locally. +// If they are present in the cache, they are served directly. +// If they are not present in the cache, they are obtained from the remote server and +// simultaneously served back to the client and written into the cache. func Download(w http.ResponseWriter, req *http.Request, origin gomatrixserverlib.ServerName, mediaID types.MediaID, cfg *config.MediaAPI, db *storage.Database, activeRemoteRequests *types.ActiveRemoteRequests) { r := &downloadRequest{ MediaMetadata: &types.MediaMetadata{ @@ -130,11 +139,8 @@ func (r *downloadRequest) doDownload(w http.ResponseWriter, cfg *config.MediaAPI JSON: jsonerror.NotFound(fmt.Sprintf("File with media ID %q does not exist", r.MediaMetadata.MediaID)), } } - // TODO: If we do not have a record and the origin is remote, we need to fetch it and respond with that file - return &util.JSONResponse{ - Code: 404, - JSON: jsonerror.NotFound(fmt.Sprintf("File with media ID %q does not exist", r.MediaMetadata.MediaID)), - } + // If we do not have a record and the origin is remote, we need to fetch it and respond with that file + return r.respondFromRemoteFile(w, cfg, db, activeRemoteRequests) } // If we have a record, we can respond from the local file r.MediaMetadata = mediaMetadata @@ -200,3 +206,311 @@ func (r *downloadRequest) respondFromLocalFile(w http.ResponseWriter, absBasePat } return nil } + +// respondFromRemoteFile fetches the remote file, caches it locally and responds from that local file +// A hash map of active remote requests to a struct containing a sync.Cond is used to only download remote files once, +// regardless of how many download requests are received. +// Returns a util.JSONResponse error in case of error +func (r *downloadRequest) respondFromRemoteFile(w http.ResponseWriter, cfg *config.MediaAPI, db *storage.Database, activeRemoteRequests *types.ActiveRemoteRequests) *util.JSONResponse { + // Note: getMediaMetadataForRemoteFile uses mutexes and conditions from activeRemoteRequests + mediaMetadata, resErr := r.getMediaMetadataForRemoteFile(db, activeRemoteRequests) + if resErr != nil { + return resErr + } else if mediaMetadata != nil { + // If we have a record, we can respond from the local file + r.MediaMetadata = mediaMetadata + } else { + // If we do not have a record, we need to fetch the remote file first and then respond from the local file + // Note: getRemoteFile uses mutexes and conditions from activeRemoteRequests + if resErr := r.getRemoteFile(cfg.AbsBasePath, cfg.MaxFileSizeBytes, db, activeRemoteRequests); resErr != nil { + return resErr + } + } + return r.respondFromLocalFile(w, cfg.AbsBasePath) +} + +func (r *downloadRequest) getMediaMetadataForRemoteFile(db *storage.Database, activeRemoteRequests *types.ActiveRemoteRequests) (*types.MediaMetadata, *util.JSONResponse) { + activeRemoteRequests.Lock() + defer activeRemoteRequests.Unlock() + + // check if we have a record of the media in our database + mediaMetadata, err := db.GetMediaMetadata(r.MediaMetadata.MediaID, r.MediaMetadata.Origin) + if err != nil { + r.Logger.WithError(err).Error("Error querying the database.") + resErr := jsonerror.InternalServerError() + return nil, &resErr + } + + if mediaMetadata != nil { + // If we have a record, we can respond from the local file + return mediaMetadata, nil + } + + // No record was found + + // Check if there is an active remote request for the file + mxcURL := "mxc://" + string(r.MediaMetadata.Origin) + "/" + string(r.MediaMetadata.MediaID) + if activeRemoteRequestResult, ok := activeRemoteRequests.MXCToResult[mxcURL]; ok { + r.Logger.Info("Waiting for another goroutine to fetch the remote file.") + + activeRemoteRequestResult.Cond.Wait() + activeRemoteRequests.Unlock() + // NOTE: there is still a deferred Unlock() that will unlock this + activeRemoteRequests.Lock() + + // check if we have a record of the media in our database + mediaMetadata, err := db.GetMediaMetadata(r.MediaMetadata.MediaID, r.MediaMetadata.Origin) + if err != nil { + r.Logger.WithError(err).Error("Error querying the database.") + resErr := jsonerror.InternalServerError() + return nil, &resErr + } + + if mediaMetadata != nil { + // If we have a record, we can respond from the local file + return mediaMetadata, nil + } + + // Note: if the result was 200, we shouldn't get here + switch activeRemoteRequestResult.Result { + case 404: + return nil, &util.JSONResponse{ + Code: 404, + JSON: jsonerror.NotFound("File not found."), + } + case 500: + r.Logger.Error("Other goroutine failed to fetch the remote file.") + resErr := jsonerror.InternalServerError() + return nil, &resErr + default: + r.Logger.Error("Other goroutine failed to fetch the remote file.") + return nil, &util.JSONResponse{ + Code: activeRemoteRequestResult.Result, + JSON: jsonerror.Unknown("Failed to fetch file from remote server."), + } + } + } + + // No active remote request so create one + activeRemoteRequests.MXCToResult[mxcURL] = &types.RemoteRequestResult{ + Cond: &sync.Cond{L: activeRemoteRequests}, + } + return nil, nil +} + +// getRemoteFile fetches the file from the remote server and stores its metadata in the database +// Only the owner of the activeRemoteRequestResult for this origin and media ID should call this function. +func (r *downloadRequest) getRemoteFile(absBasePath types.Path, maxFileSizeBytes types.FileSizeBytes, db *storage.Database, activeRemoteRequests *types.ActiveRemoteRequests) *util.JSONResponse { + // Wake up other goroutines after this function returns. + isError := true + var result int + defer func() { + if isError { + // If an error happens, the lock MUST NOT have been taken, isError MUST be true and so the lock is taken here. + activeRemoteRequests.Lock() + } + defer activeRemoteRequests.Unlock() + mxcURL := "mxc://" + string(r.MediaMetadata.Origin) + "/" + string(r.MediaMetadata.MediaID) + if activeRemoteRequestResult, ok := activeRemoteRequests.MXCToResult[mxcURL]; ok { + r.Logger.Info("Signalling other goroutines waiting for this goroutine to fetch the file.") + if result == 0 { + r.Logger.Error("Invalid result, treating as InternalServerError") + result = 500 + } + activeRemoteRequestResult.Result = result + activeRemoteRequestResult.Cond.Broadcast() + } + delete(activeRemoteRequests.MXCToResult, mxcURL) + }() + + finalPath, duplicate, resErr := r.fetchRemoteFile(absBasePath, maxFileSizeBytes) + if resErr != nil { + result = resErr.Code + return resErr + } + + // NOTE: Writing the metadata to the media repository database and removing the mxcURL from activeRemoteRequests needs to be atomic. + // If it were not atomic, a new request for the same file could come in in routine A and check the database before the INSERT. + // Routine B which was fetching could then have its INSERT complete and remove the mxcURL from the activeRemoteRequests. + // If routine A then checked the activeRemoteRequests it would think it needed to fetch the file when it's already in the database. + // The locking below mitigates this situation. + + // NOTE: The following two lines MUST remain together! + // isError == true causes the lock to be taken in a deferred function! + activeRemoteRequests.Lock() + isError = false + + r.Logger.WithFields(log.Fields{ + "Base64Hash": r.MediaMetadata.Base64Hash, + "UploadName": r.MediaMetadata.UploadName, + "FileSizeBytes": r.MediaMetadata.FileSizeBytes, + "Content-Type": r.MediaMetadata.ContentType, + }).Info("Storing file metadata to media repository database") + + // FIXME: timeout db request + if err := db.StoreMediaMetadata(r.MediaMetadata); err != nil { + // If the file is a duplicate (has the same hash as an existing file) then + // there is valid metadata in the database for that file. As such we only + // remove the file if it is not a duplicate. + if duplicate == false { + finalDir := filepath.Dir(string(finalPath)) + fileutils.RemoveDir(types.Path(finalDir), r.Logger) + } + // NOTE: It should really not be possible to fail the uniqueness test here so + // there is no need to handle that separately + resErr := jsonerror.InternalServerError() + result = resErr.Code + return &resErr + } + + // TODO: generate thumbnails + + r.Logger.WithFields(log.Fields{ + "UploadName": r.MediaMetadata.UploadName, + "Base64Hash": r.MediaMetadata.Base64Hash, + "FileSizeBytes": r.MediaMetadata.FileSizeBytes, + "Content-Type": r.MediaMetadata.ContentType, + }).Infof("Remote file cached") + + result = 200 + return nil +} + +func (r *downloadRequest) fetchRemoteFile(absBasePath types.Path, maxFileSizeBytes types.FileSizeBytes) (types.Path, bool, *util.JSONResponse) { + r.Logger.Info("Fetching remote file") + + // create request for remote file + resp, resErr := r.createRemoteRequest() + if resErr != nil { + return "", false, resErr + } + defer resp.Body.Close() + + // get metadata from request and set metadata on response + contentLength, err := strconv.ParseInt(resp.Header.Get("Content-Length"), 10, 64) + if err != nil { + r.Logger.WithError(err).Warn("Failed to parse content length") + return "", false, &util.JSONResponse{ + Code: 502, + JSON: jsonerror.Unknown("Invalid response from remote server"), + } + } + if contentLength > int64(maxFileSizeBytes) { + return "", false, &util.JSONResponse{ + Code: 413, + JSON: jsonerror.Unknown(fmt.Sprintf("Remote file is too large (%v > %v bytes)", contentLength, maxFileSizeBytes)), + } + } + r.MediaMetadata.FileSizeBytes = types.FileSizeBytes(contentLength) + r.MediaMetadata.ContentType = types.ContentType(resp.Header.Get("Content-Type")) + r.MediaMetadata.UploadName = types.Filename(contentDispositionToFilename(resp.Header.Get("Content-Disposition"))) + + r.Logger.Info("Transferring remote file") + + // The file data is hashed but is NOT used as the MediaID, unlike in Upload. The hash is useful as a + // method of deduplicating files to save storage, as well as a way to conduct + // integrity checks on the file data in the repository. + // Data is truncated to maxFileSizeBytes. Content-Length was reported as 0 < Content-Length <= maxFileSizeBytes so this is OK. + hash, bytesWritten, tmpDir, err := fileutils.WriteTempFile(resp.Body, maxFileSizeBytes, absBasePath) + if err != nil { + r.Logger.WithError(err).WithFields(log.Fields{ + "MaxFileSizeBytes": maxFileSizeBytes, + }).Warn("Error while downloading file from remote server") + fileutils.RemoveDir(tmpDir, r.Logger) + return "", false, &util.JSONResponse{ + Code: 502, + JSON: jsonerror.Unknown("File could not be downloaded from remote server"), + } + } + + r.Logger.Info("Remote file transferred") + + // It's possible the bytesWritten to the temporary file is different to the reported Content-Length from the remote + // request's response. bytesWritten is therefore used as it is what would be sent to clients when reading from the local + // file. + r.MediaMetadata.FileSizeBytes = types.FileSizeBytes(bytesWritten) + r.MediaMetadata.Base64Hash = hash + + // The database is the source of truth so we need to have moved the file first + finalPath, duplicate, err := fileutils.MoveFileWithHashCheck(tmpDir, r.MediaMetadata, absBasePath, r.Logger) + if err != nil { + r.Logger.WithError(err).Error("Failed to move file.") + resErr := jsonerror.InternalServerError() + return "", false, &resErr + } + if duplicate { + r.Logger.WithField("dst", finalPath).Info("File was stored previously - discarding duplicate") + // Continue on to store the metadata in the database + } + + return types.Path(finalPath), duplicate, nil +} + +func (r *downloadRequest) createRemoteRequest() (*http.Response, *util.JSONResponse) { + dnsResult, err := gomatrixserverlib.LookupServer(r.MediaMetadata.Origin) + if err != nil { + if dnsErr, ok := err.(*net.DNSError); ok && dnsErr.Timeout() { + return nil, &util.JSONResponse{ + Code: 504, + JSON: jsonerror.Unknown(fmt.Sprintf("DNS look up for homeserver at %v timed out", r.MediaMetadata.Origin)), + } + } + resErr := jsonerror.InternalServerError() + return nil, &resErr + } + url := "https://" + strings.Trim(dnsResult.SRVRecords[0].Target, ".") + ":" + strconv.Itoa(int(dnsResult.SRVRecords[0].Port)) + + r.Logger.WithField("URL", url).Info("Connecting to remote") + + remoteReqAddr := url + "/_matrix/media/v1/download/" + string(r.MediaMetadata.Origin) + "/" + string(r.MediaMetadata.MediaID) + remoteReq, err := http.NewRequest("GET", remoteReqAddr, nil) + if err != nil { + resErr := jsonerror.InternalServerError() + return nil, &resErr + } + + remoteReq.Header.Set("Host", string(r.MediaMetadata.Origin)) + + client := http.Client{} + resp, err := client.Do(remoteReq) + if err != nil { + r.Logger.Warn("Failed to execute request for remote file") + return nil, &util.JSONResponse{ + Code: 502, + JSON: jsonerror.Unknown(fmt.Sprintf("File with media ID %q could not be downloaded from %q", r.MediaMetadata.MediaID, r.MediaMetadata.Origin)), + } + } + + if resp.StatusCode != 200 { + if resp.StatusCode == 404 { + return nil, &util.JSONResponse{ + Code: 404, + JSON: jsonerror.NotFound(fmt.Sprintf("File with media ID %q does not exist", r.MediaMetadata.MediaID)), + } + } + r.Logger.WithFields(log.Fields{ + "StatusCode": resp.StatusCode, + }).Warn("Received error response") + return nil, &util.JSONResponse{ + Code: 502, + JSON: jsonerror.Unknown(fmt.Sprintf("File with media ID %q could not be downloaded from %q", r.MediaMetadata.MediaID, r.MediaMetadata.Origin)), + } + } + + return resp, nil +} + +var contentDispositionRegex = regexp.MustCompile("filename([*])?=(utf-8'')?([A-Za-z0-9._-]+)") + +func contentDispositionToFilename(contentDisposition string) types.Filename { + filename := "" + if matches := contentDispositionRegex.FindStringSubmatch(contentDisposition); len(matches) == 4 { + // Note: the filename should already be escaped. If not, unescape should be close to a no-op. This way filename is sure to be safe. + unescaped, err := url.PathUnescape(matches[3]) + if err != nil { + unescaped = matches[3] + } + filename = url.PathEscape(unescaped) + } + return types.Filename(filename) +} From a3b1c7535a02c3c3aefd1a7d7d13652a3ba04988 Mon Sep 17 00:00:00 2001 From: Robert Swain Date: Thu, 1 Jun 2017 08:39:35 +0200 Subject: [PATCH 03/10] mediaapi/writers/download: Remove unnecesary Unlock, Lock after Cond.Wait --- .../matrix-org/dendrite/mediaapi/writers/download.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/mediaapi/writers/download.go b/src/github.com/matrix-org/dendrite/mediaapi/writers/download.go index 35eca6b6..aecb1dd3 100644 --- a/src/github.com/matrix-org/dendrite/mediaapi/writers/download.go +++ b/src/github.com/matrix-org/dendrite/mediaapi/writers/download.go @@ -253,10 +253,8 @@ func (r *downloadRequest) getMediaMetadataForRemoteFile(db *storage.Database, ac if activeRemoteRequestResult, ok := activeRemoteRequests.MXCToResult[mxcURL]; ok { r.Logger.Info("Waiting for another goroutine to fetch the remote file.") + // NOTE: Wait unlocks and locks again internally. There is still a deferred Unlock() that will unlock this. activeRemoteRequestResult.Cond.Wait() - activeRemoteRequests.Unlock() - // NOTE: there is still a deferred Unlock() that will unlock this - activeRemoteRequests.Lock() // check if we have a record of the media in our database mediaMetadata, err := db.GetMediaMetadata(r.MediaMetadata.MediaID, r.MediaMetadata.Origin) From 4457ebddca3f8ccb0b9e71f7a16a948971aa2585 Mon Sep 17 00:00:00 2001 From: Robert Swain Date: Thu, 1 Jun 2017 12:32:15 +0200 Subject: [PATCH 04/10] mediaapi/writers/download: Rework remote file download synchronisation Avoid locking around db requests by only locking around active requests and always creating an active request if there is none. A nice side effect of this is that if many parallel requests for remote media come in (a common case) then only one database query is made for the duration of the query. --- .../dendrite/mediaapi/types/types.go | 7 +- .../dendrite/mediaapi/writers/download.go | 160 +++++++----------- 2 files changed, 69 insertions(+), 98 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/mediaapi/types/types.go b/src/github.com/matrix-org/dendrite/mediaapi/types/types.go index 82cc1d7c..d54bcdf6 100644 --- a/src/github.com/matrix-org/dendrite/mediaapi/types/types.go +++ b/src/github.com/matrix-org/dendrite/mediaapi/types/types.go @@ -18,6 +18,7 @@ import ( "sync" "github.com/matrix-org/gomatrixserverlib" + "github.com/matrix-org/util" ) // FileSizeBytes is a file size in bytes @@ -63,8 +64,10 @@ type MediaMetadata struct { type RemoteRequestResult struct { // Condition used for the requester to signal the result to all other routines waiting on this condition Cond *sync.Cond - // Resulting HTTP status code from the request - Result int + // MediaMetadata of the requested file to avoid querying the database for every waiting routine + MediaMetadata *MediaMetadata + // An error in util.JSONResponse form. nil in case of no error. + ErrorResponse *util.JSONResponse } // ActiveRemoteRequests is a lockable map of media URIs requested from remote homeservers diff --git a/src/github.com/matrix-org/dendrite/mediaapi/writers/download.go b/src/github.com/matrix-org/dendrite/mediaapi/writers/download.go index aecb1dd3..eac5d764 100644 --- a/src/github.com/matrix-org/dendrite/mediaapi/writers/download.go +++ b/src/github.com/matrix-org/dendrite/mediaapi/writers/download.go @@ -140,10 +140,14 @@ func (r *downloadRequest) doDownload(w http.ResponseWriter, cfg *config.MediaAPI } } // If we do not have a record and the origin is remote, we need to fetch it and respond with that file - return r.respondFromRemoteFile(w, cfg, db, activeRemoteRequests) + resErr := r.getRemoteFile(cfg, db, activeRemoteRequests) + if resErr != nil { + return resErr + } + } else { + // If we have a record, we can respond from the local file + r.MediaMetadata = mediaMetadata } - // If we have a record, we can respond from the local file - r.MediaMetadata = mediaMetadata return r.respondFromLocalFile(w, cfg.AbsBasePath) } @@ -207,137 +211,103 @@ func (r *downloadRequest) respondFromLocalFile(w http.ResponseWriter, absBasePat return nil } -// respondFromRemoteFile fetches the remote file, caches it locally and responds from that local file +// getRemoteFile fetches the remote file and caches it locally // A hash map of active remote requests to a struct containing a sync.Cond is used to only download remote files once, // regardless of how many download requests are received. +// Note: The named errorResponse return variable is used in a deferred broadcast of the metadata and error response to waiting goroutines. // Returns a util.JSONResponse error in case of error -func (r *downloadRequest) respondFromRemoteFile(w http.ResponseWriter, cfg *config.MediaAPI, db *storage.Database, activeRemoteRequests *types.ActiveRemoteRequests) *util.JSONResponse { - // Note: getMediaMetadataForRemoteFile uses mutexes and conditions from activeRemoteRequests - mediaMetadata, resErr := r.getMediaMetadataForRemoteFile(db, activeRemoteRequests) +func (r *downloadRequest) getRemoteFile(cfg *config.MediaAPI, db *storage.Database, activeRemoteRequests *types.ActiveRemoteRequests) (errorResponse *util.JSONResponse) { + // Note: getMediaMetadataFromActiveRequest uses mutexes and conditions from activeRemoteRequests + mediaMetadata, resErr := r.getMediaMetadataFromActiveRequest(activeRemoteRequests) if resErr != nil { return resErr } else if mediaMetadata != nil { - // If we have a record, we can respond from the local file + // If we got metadata from an active request, we can respond from the local file r.MediaMetadata = mediaMetadata } else { - // If we do not have a record, we need to fetch the remote file first and then respond from the local file - // Note: getRemoteFile uses mutexes and conditions from activeRemoteRequests - if resErr := r.getRemoteFile(cfg.AbsBasePath, cfg.MaxFileSizeBytes, db, activeRemoteRequests); resErr != nil { - return resErr - } - } - return r.respondFromLocalFile(w, cfg.AbsBasePath) -} - -func (r *downloadRequest) getMediaMetadataForRemoteFile(db *storage.Database, activeRemoteRequests *types.ActiveRemoteRequests) (*types.MediaMetadata, *util.JSONResponse) { - activeRemoteRequests.Lock() - defer activeRemoteRequests.Unlock() - - // check if we have a record of the media in our database - mediaMetadata, err := db.GetMediaMetadata(r.MediaMetadata.MediaID, r.MediaMetadata.Origin) - if err != nil { - r.Logger.WithError(err).Error("Error querying the database.") - resErr := jsonerror.InternalServerError() - return nil, &resErr - } - - if mediaMetadata != nil { - // If we have a record, we can respond from the local file - return mediaMetadata, nil - } - - // No record was found - - // Check if there is an active remote request for the file - mxcURL := "mxc://" + string(r.MediaMetadata.Origin) + "/" + string(r.MediaMetadata.MediaID) - if activeRemoteRequestResult, ok := activeRemoteRequests.MXCToResult[mxcURL]; ok { - r.Logger.Info("Waiting for another goroutine to fetch the remote file.") - - // NOTE: Wait unlocks and locks again internally. There is still a deferred Unlock() that will unlock this. - activeRemoteRequestResult.Cond.Wait() + // Note: This is an active request that MUST broadcastMediaMetadata to wake up waiting goroutines! + // Note: errorResponse is the named return variable + // Note: broadcastMediaMetadata uses mutexes and conditions from activeRemoteRequests + defer r.broadcastMediaMetadata(activeRemoteRequests, errorResponse) // check if we have a record of the media in our database mediaMetadata, err := db.GetMediaMetadata(r.MediaMetadata.MediaID, r.MediaMetadata.Origin) if err != nil { r.Logger.WithError(err).Error("Error querying the database.") resErr := jsonerror.InternalServerError() - return nil, &resErr + return &resErr } - if mediaMetadata != nil { + if mediaMetadata == nil { + // If we do not have a record, we need to fetch the remote file first and then respond from the local file + resErr := r.fetchRemoteFileAndStoreMetadata(cfg.AbsBasePath, cfg.MaxFileSizeBytes, db) + if resErr != nil { + return resErr + } + } else { // If we have a record, we can respond from the local file - return mediaMetadata, nil + r.MediaMetadata = mediaMetadata + } + } + return +} + +func (r *downloadRequest) getMediaMetadataFromActiveRequest(activeRemoteRequests *types.ActiveRemoteRequests) (*types.MediaMetadata, *util.JSONResponse) { + // Check if there is an active remote request for the file + mxcURL := "mxc://" + string(r.MediaMetadata.Origin) + "/" + string(r.MediaMetadata.MediaID) + + activeRemoteRequests.Lock() + defer activeRemoteRequests.Unlock() + + if activeRemoteRequestResult, ok := activeRemoteRequests.MXCToResult[mxcURL]; ok { + r.Logger.Info("Waiting for another goroutine to fetch the remote file.") + + // NOTE: Wait unlocks and locks again internally. There is still a deferred Unlock() that will unlock this. + activeRemoteRequestResult.Cond.Wait() + if activeRemoteRequestResult.ErrorResponse != nil { + return nil, activeRemoteRequestResult.ErrorResponse } - // Note: if the result was 200, we shouldn't get here - switch activeRemoteRequestResult.Result { - case 404: + if activeRemoteRequestResult.MediaMetadata == nil { return nil, &util.JSONResponse{ Code: 404, JSON: jsonerror.NotFound("File not found."), } - case 500: - r.Logger.Error("Other goroutine failed to fetch the remote file.") - resErr := jsonerror.InternalServerError() - return nil, &resErr - default: - r.Logger.Error("Other goroutine failed to fetch the remote file.") - return nil, &util.JSONResponse{ - Code: activeRemoteRequestResult.Result, - JSON: jsonerror.Unknown("Failed to fetch file from remote server."), - } } + + return activeRemoteRequestResult.MediaMetadata, nil } // No active remote request so create one activeRemoteRequests.MXCToResult[mxcURL] = &types.RemoteRequestResult{ Cond: &sync.Cond{L: activeRemoteRequests}, } + return nil, nil } -// getRemoteFile fetches the file from the remote server and stores its metadata in the database +// broadcastMediaMetadata broadcasts the media metadata and error response to waiting goroutines // Only the owner of the activeRemoteRequestResult for this origin and media ID should call this function. -func (r *downloadRequest) getRemoteFile(absBasePath types.Path, maxFileSizeBytes types.FileSizeBytes, db *storage.Database, activeRemoteRequests *types.ActiveRemoteRequests) *util.JSONResponse { - // Wake up other goroutines after this function returns. - isError := true - var result int - defer func() { - if isError { - // If an error happens, the lock MUST NOT have been taken, isError MUST be true and so the lock is taken here. - activeRemoteRequests.Lock() - } - defer activeRemoteRequests.Unlock() - mxcURL := "mxc://" + string(r.MediaMetadata.Origin) + "/" + string(r.MediaMetadata.MediaID) - if activeRemoteRequestResult, ok := activeRemoteRequests.MXCToResult[mxcURL]; ok { - r.Logger.Info("Signalling other goroutines waiting for this goroutine to fetch the file.") - if result == 0 { - r.Logger.Error("Invalid result, treating as InternalServerError") - result = 500 - } - activeRemoteRequestResult.Result = result - activeRemoteRequestResult.Cond.Broadcast() - } - delete(activeRemoteRequests.MXCToResult, mxcURL) - }() +func (r *downloadRequest) broadcastMediaMetadata(activeRemoteRequests *types.ActiveRemoteRequests, errorResponse *util.JSONResponse) { + activeRemoteRequests.Lock() + defer activeRemoteRequests.Unlock() + mxcURL := "mxc://" + string(r.MediaMetadata.Origin) + "/" + string(r.MediaMetadata.MediaID) + if activeRemoteRequestResult, ok := activeRemoteRequests.MXCToResult[mxcURL]; ok { + r.Logger.Info("Signalling other goroutines waiting for this goroutine to fetch the file.") + activeRemoteRequestResult.MediaMetadata = r.MediaMetadata + activeRemoteRequestResult.ErrorResponse = errorResponse + activeRemoteRequestResult.Cond.Broadcast() + } + delete(activeRemoteRequests.MXCToResult, mxcURL) +} +// fetchRemoteFileAndStoreMetadata fetches the file from the remote server and stores its metadata in the database +func (r *downloadRequest) fetchRemoteFileAndStoreMetadata(absBasePath types.Path, maxFileSizeBytes types.FileSizeBytes, db *storage.Database) *util.JSONResponse { finalPath, duplicate, resErr := r.fetchRemoteFile(absBasePath, maxFileSizeBytes) if resErr != nil { - result = resErr.Code return resErr } - // NOTE: Writing the metadata to the media repository database and removing the mxcURL from activeRemoteRequests needs to be atomic. - // If it were not atomic, a new request for the same file could come in in routine A and check the database before the INSERT. - // Routine B which was fetching could then have its INSERT complete and remove the mxcURL from the activeRemoteRequests. - // If routine A then checked the activeRemoteRequests it would think it needed to fetch the file when it's already in the database. - // The locking below mitigates this situation. - - // NOTE: The following two lines MUST remain together! - // isError == true causes the lock to be taken in a deferred function! - activeRemoteRequests.Lock() - isError = false - r.Logger.WithFields(log.Fields{ "Base64Hash": r.MediaMetadata.Base64Hash, "UploadName": r.MediaMetadata.UploadName, @@ -357,7 +327,6 @@ func (r *downloadRequest) getRemoteFile(absBasePath types.Path, maxFileSizeBytes // NOTE: It should really not be possible to fail the uniqueness test here so // there is no need to handle that separately resErr := jsonerror.InternalServerError() - result = resErr.Code return &resErr } @@ -370,7 +339,6 @@ func (r *downloadRequest) getRemoteFile(absBasePath types.Path, maxFileSizeBytes "Content-Type": r.MediaMetadata.ContentType, }).Infof("Remote file cached") - result = 200 return nil } From ce21b32d38f7506356c8f09ea233dd898daf1feb Mon Sep 17 00:00:00 2001 From: Robert Swain Date: Thu, 1 Jun 2017 14:44:00 +0200 Subject: [PATCH 05/10] mediaapi/writers/download: Wrap broadcast call in closure to re-evaluate args --- .../matrix-org/dendrite/mediaapi/writers/download.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/mediaapi/writers/download.go b/src/github.com/matrix-org/dendrite/mediaapi/writers/download.go index eac5d764..65f4572a 100644 --- a/src/github.com/matrix-org/dendrite/mediaapi/writers/download.go +++ b/src/github.com/matrix-org/dendrite/mediaapi/writers/download.go @@ -226,9 +226,11 @@ func (r *downloadRequest) getRemoteFile(cfg *config.MediaAPI, db *storage.Databa r.MediaMetadata = mediaMetadata } else { // Note: This is an active request that MUST broadcastMediaMetadata to wake up waiting goroutines! - // Note: errorResponse is the named return variable // Note: broadcastMediaMetadata uses mutexes and conditions from activeRemoteRequests - defer r.broadcastMediaMetadata(activeRemoteRequests, errorResponse) + defer func() { + // Note: errorResponse is the named return variable so we wrap this in a closure to re-evaluate the arguments at defer-time + r.broadcastMediaMetadata(activeRemoteRequests, errorResponse) + }() // check if we have a record of the media in our database mediaMetadata, err := db.GetMediaMetadata(r.MediaMetadata.MediaID, r.MediaMetadata.Origin) From cae309fa2644ac3753f1e5bb3c0eb6e8a5307a72 Mon Sep 17 00:00:00 2001 From: Robert Swain Date: Thu, 1 Jun 2017 14:54:59 +0200 Subject: [PATCH 06/10] mediaapi/writers/download: Handle panic to ensure waking of goroutines If the active request were to panic, we need to ensure all the waiting goroutines get woken up. --- .../matrix-org/dendrite/mediaapi/writers/download.go | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/github.com/matrix-org/dendrite/mediaapi/writers/download.go b/src/github.com/matrix-org/dendrite/mediaapi/writers/download.go index 65f4572a..c320e512 100644 --- a/src/github.com/matrix-org/dendrite/mediaapi/writers/download.go +++ b/src/github.com/matrix-org/dendrite/mediaapi/writers/download.go @@ -229,6 +229,11 @@ func (r *downloadRequest) getRemoteFile(cfg *config.MediaAPI, db *storage.Databa // Note: broadcastMediaMetadata uses mutexes and conditions from activeRemoteRequests defer func() { // Note: errorResponse is the named return variable so we wrap this in a closure to re-evaluate the arguments at defer-time + if err := recover(); err != nil { + resErr := jsonerror.InternalServerError() + r.broadcastMediaMetadata(activeRemoteRequests, &resErr) + panic(err) + } r.broadcastMediaMetadata(activeRemoteRequests, errorResponse) }() From 2d822c57f986a054baf7bfaa76bd14ca5a862b5f Mon Sep 17 00:00:00 2001 From: Robert Swain Date: Thu, 1 Jun 2017 15:30:36 +0200 Subject: [PATCH 07/10] mediaapi/writers/download: Use DNS address as not all HSes have SRV --- .../matrix-org/dendrite/mediaapi/writers/download.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/mediaapi/writers/download.go b/src/github.com/matrix-org/dendrite/mediaapi/writers/download.go index c320e512..c13e52e3 100644 --- a/src/github.com/matrix-org/dendrite/mediaapi/writers/download.go +++ b/src/github.com/matrix-org/dendrite/mediaapi/writers/download.go @@ -25,7 +25,6 @@ import ( "path/filepath" "regexp" "strconv" - "strings" "sync" log "github.com/Sirupsen/logrus" @@ -431,7 +430,7 @@ func (r *downloadRequest) createRemoteRequest() (*http.Response, *util.JSONRespo resErr := jsonerror.InternalServerError() return nil, &resErr } - url := "https://" + strings.Trim(dnsResult.SRVRecords[0].Target, ".") + ":" + strconv.Itoa(int(dnsResult.SRVRecords[0].Port)) + url := "https://" + dnsResult.Addrs[0] r.Logger.WithField("URL", url).Info("Connecting to remote") From a398cdd193554c2ab46dbe67302f5401be6945a1 Mon Sep 17 00:00:00 2001 From: Robert Swain Date: Thu, 1 Jun 2017 16:04:41 +0200 Subject: [PATCH 08/10] mediaapi/writers/download: Use mime.ParseMediaType to parse Content-Disposition --- .../dendrite/mediaapi/writers/download.go | 28 ++++++------------- 1 file changed, 8 insertions(+), 20 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/mediaapi/writers/download.go b/src/github.com/matrix-org/dendrite/mediaapi/writers/download.go index c13e52e3..dc575696 100644 --- a/src/github.com/matrix-org/dendrite/mediaapi/writers/download.go +++ b/src/github.com/matrix-org/dendrite/mediaapi/writers/download.go @@ -18,9 +18,9 @@ import ( "encoding/json" "fmt" "io" + "mime" "net" "net/http" - "net/url" "os" "path/filepath" "regexp" @@ -375,7 +375,10 @@ func (r *downloadRequest) fetchRemoteFile(absBasePath types.Path, maxFileSizeByt } r.MediaMetadata.FileSizeBytes = types.FileSizeBytes(contentLength) r.MediaMetadata.ContentType = types.ContentType(resp.Header.Get("Content-Type")) - r.MediaMetadata.UploadName = types.Filename(contentDispositionToFilename(resp.Header.Get("Content-Disposition"))) + _, params, err := mime.ParseMediaType(resp.Header.Get("Content-Disposition")) + if err == nil && params["filename"] != "" { + r.MediaMetadata.UploadName = types.Filename(params["filename"]) + } r.Logger.Info("Transferring remote file") @@ -430,11 +433,11 @@ func (r *downloadRequest) createRemoteRequest() (*http.Response, *util.JSONRespo resErr := jsonerror.InternalServerError() return nil, &resErr } - url := "https://" + dnsResult.Addrs[0] + httpsURL := "https://" + dnsResult.Addrs[0] - r.Logger.WithField("URL", url).Info("Connecting to remote") + r.Logger.WithField("URL", httpsURL).Info("Connecting to remote") - remoteReqAddr := url + "/_matrix/media/v1/download/" + string(r.MediaMetadata.Origin) + "/" + string(r.MediaMetadata.MediaID) + remoteReqAddr := httpsURL + "/_matrix/media/v1/download/" + string(r.MediaMetadata.Origin) + "/" + string(r.MediaMetadata.MediaID) remoteReq, err := http.NewRequest("GET", remoteReqAddr, nil) if err != nil { resErr := jsonerror.InternalServerError() @@ -471,18 +474,3 @@ func (r *downloadRequest) createRemoteRequest() (*http.Response, *util.JSONRespo return resp, nil } - -var contentDispositionRegex = regexp.MustCompile("filename([*])?=(utf-8'')?([A-Za-z0-9._-]+)") - -func contentDispositionToFilename(contentDisposition string) types.Filename { - filename := "" - if matches := contentDispositionRegex.FindStringSubmatch(contentDisposition); len(matches) == 4 { - // Note: the filename should already be escaped. If not, unescape should be close to a no-op. This way filename is sure to be safe. - unescaped, err := url.PathUnescape(matches[3]) - if err != nil { - unescaped = matches[3] - } - filename = url.PathEscape(unescaped) - } - return types.Filename(filename) -} From 0fff33928e4c193b530eb81d9f00cd2aa1f15dcf Mon Sep 17 00:00:00 2001 From: Robert Swain Date: Thu, 1 Jun 2017 17:56:30 +0200 Subject: [PATCH 09/10] vendor: Bump gomatrixserverlib to pull in CreateMediaDownloadRequest --- vendor/manifest | 2 +- .../github.com/matrix-org/gomatrixserverlib/client.go | 11 +++++++++++ 2 files changed, 12 insertions(+), 1 deletion(-) diff --git a/vendor/manifest b/vendor/manifest index e76c66cc..b5725ca3 100644 --- a/vendor/manifest +++ b/vendor/manifest @@ -98,7 +98,7 @@ { "importpath": "github.com/matrix-org/gomatrixserverlib", "repository": "https://github.com/matrix-org/gomatrixserverlib", - "revision": "c396ef3cc1e546729f7052f1f48e345cc59269f4", + "revision": "b1dfcb3b345cc8410f1a03fec0a1ffe6bd002dcd", "branch": "master" }, { diff --git a/vendor/src/github.com/matrix-org/gomatrixserverlib/client.go b/vendor/src/github.com/matrix-org/gomatrixserverlib/client.go index 40c9ebe4..7d322f93 100644 --- a/vendor/src/github.com/matrix-org/gomatrixserverlib/client.go +++ b/vendor/src/github.com/matrix-org/gomatrixserverlib/client.go @@ -220,3 +220,14 @@ func (fc *Client) LookupServerKeys( } return result, nil } + +// CreateMediaDownloadRequest creates a request for media on a homeserver and returns the http.Response or an error +func (fc *Client) CreateMediaDownloadRequest(matrixServer ServerName, mediaID string) (*http.Response, error) { + requestURL := "matrix://" + string(matrixServer) + "/_matrix/media/v1/download/" + string(matrixServer) + "/" + mediaID + resp, err := fc.client.Get(requestURL) + if err != nil { + return nil, err + } + + return resp, nil +} From 7244fc15b3c8b3e8c5c3057a6ac78648c44362fa Mon Sep 17 00:00:00 2001 From: Robert Swain Date: Thu, 1 Jun 2017 17:57:05 +0200 Subject: [PATCH 10/10] media/writers/download: Make use of CreateMediaDownloadRequest from lib --- .../dendrite/mediaapi/writers/download.go | 32 +++---------------- 1 file changed, 4 insertions(+), 28 deletions(-) diff --git a/src/github.com/matrix-org/dendrite/mediaapi/writers/download.go b/src/github.com/matrix-org/dendrite/mediaapi/writers/download.go index dc575696..af31535b 100644 --- a/src/github.com/matrix-org/dendrite/mediaapi/writers/download.go +++ b/src/github.com/matrix-org/dendrite/mediaapi/writers/download.go @@ -19,7 +19,6 @@ import ( "fmt" "io" "mime" - "net" "net/http" "os" "path/filepath" @@ -422,34 +421,11 @@ func (r *downloadRequest) fetchRemoteFile(absBasePath types.Path, maxFileSizeByt } func (r *downloadRequest) createRemoteRequest() (*http.Response, *util.JSONResponse) { - dnsResult, err := gomatrixserverlib.LookupServer(r.MediaMetadata.Origin) + matrixClient := gomatrixserverlib.NewClient() + + resp, err := matrixClient.CreateMediaDownloadRequest(r.MediaMetadata.Origin, string(r.MediaMetadata.MediaID)) if err != nil { - if dnsErr, ok := err.(*net.DNSError); ok && dnsErr.Timeout() { - return nil, &util.JSONResponse{ - Code: 504, - JSON: jsonerror.Unknown(fmt.Sprintf("DNS look up for homeserver at %v timed out", r.MediaMetadata.Origin)), - } - } - resErr := jsonerror.InternalServerError() - return nil, &resErr - } - httpsURL := "https://" + dnsResult.Addrs[0] - - r.Logger.WithField("URL", httpsURL).Info("Connecting to remote") - - remoteReqAddr := httpsURL + "/_matrix/media/v1/download/" + string(r.MediaMetadata.Origin) + "/" + string(r.MediaMetadata.MediaID) - remoteReq, err := http.NewRequest("GET", remoteReqAddr, nil) - if err != nil { - resErr := jsonerror.InternalServerError() - return nil, &resErr - } - - remoteReq.Header.Set("Host", string(r.MediaMetadata.Origin)) - - client := http.Client{} - resp, err := client.Do(remoteReq) - if err != nil { - r.Logger.Warn("Failed to execute request for remote file") + r.Logger.WithError(err).Error("Failed to create download request") return nil, &util.JSONResponse{ Code: 502, JSON: jsonerror.Unknown(fmt.Sprintf("File with media ID %q could not be downloaded from %q", r.MediaMetadata.MediaID, r.MediaMetadata.Origin)),