From 302ec17beed9128101ef61d69b45d3ee29e16f1e Mon Sep 17 00:00:00 2001 From: George Goldberg Date: Fri, 28 Apr 2017 17:54:04 +0100 Subject: Parallelise Bulk Import. (#6267) * Parallelise Bulk Import. * Set worker count through command line flag. --- app/import.go | 70 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++--- 1 file changed, 67 insertions(+), 3 deletions(-) (limited to 'app/import.go') diff --git a/app/import.go b/app/import.go index f92c9b1cc..488cb22aa 100644 --- a/app/import.go +++ b/app/import.go @@ -11,6 +11,7 @@ import ( "net/http" "regexp" "strings" + "sync" "unicode/utf8" l4g "github.com/alecthomas/log4go" @@ -95,15 +96,40 @@ type PostImportData struct { CreateAt *int64 `json:"create_at"` } +type LineImportWorkerData struct { + LineImportData + LineNumber int +} + +type LineImportWorkerError struct { + Error *model.AppError + LineNumber int +} + // // -- Bulk Import Functions -- // These functions import data directly into the database. Security and permission checks are bypassed but validity is // still enforced. // -func BulkImport(fileReader io.Reader, dryRun bool) (*model.AppError, int) { +func bulkImportWorker(dryRun bool, wg *sync.WaitGroup, lines <-chan LineImportWorkerData, errors chan<- LineImportWorkerError) { + for line := range lines { + if err := ImportLine(line.LineImportData, dryRun); err != nil { + errors <- LineImportWorkerError{err, line.LineNumber} + } + } + wg.Done() +} + +func BulkImport(fileReader io.Reader, dryRun bool, workers int) (*model.AppError, int) { scanner := bufio.NewScanner(fileReader) lineNumber := 0 + + errorsChan := make(chan LineImportWorkerError, (2*workers)+1) // size chosen to ensure it never gets filled up completely. + var wg sync.WaitGroup + var linesChan chan LineImportWorkerData + lastLineType := "" + for scanner.Scan() { decoder := json.NewDecoder(strings.NewReader(scanner.Text())) lineNumber++ @@ -121,12 +147,50 @@ func BulkImport(fileReader io.Reader, dryRun bool) (*model.AppError, int) { if importDataFileVersion != 1 { return model.NewAppError("BulkImport", "app.import.bulk_import.unsupported_version.error", nil, "", http.StatusBadRequest), lineNumber } - } else if err := ImportLine(line, dryRun); err != nil { - return err, lineNumber + } else { + if line.Type != lastLineType { + if lastLineType != "" { + // Changing type. Clear out the worker queue before continuing. + close(linesChan) + wg.Wait() + + // Check no errors occurred while waiting for the queue to empty. + if len(errorsChan) != 0 { + err := <-errorsChan + return err.Error, err.LineNumber + } + } + + // Set up the workers and channel for this type. + lastLineType = line.Type + linesChan = make(chan LineImportWorkerData, workers) + for i := 0; i < workers; i++ { + wg.Add(1) + go bulkImportWorker(dryRun, &wg, linesChan, errorsChan) + } + } + + select { + case linesChan <- LineImportWorkerData{line, lineNumber}: + case err := <-errorsChan: + close(linesChan) + wg.Wait() + return err.Error, err.LineNumber + } } } } + // No more lines. Clear out the worker queue before continuing. + close(linesChan) + wg.Wait() + + // Check no errors occurred while waiting for the queue to empty. + if len(errorsChan) != 0 { + err := <-errorsChan + return err.Error, err.LineNumber + } + if err := scanner.Err(); err != nil { return model.NewLocAppError("BulkImport", "app.import.bulk_import.file_scan.error", nil, err.Error()), 0 } -- cgit v1.2.3-1-g7c22