From 701d1ab638b23c24877fc41824add66232446676 Mon Sep 17 00:00:00 2001 From: Christopher Speller Date: Thu, 2 Feb 2017 09:32:00 -0500 Subject: Updating server dependancies (#5249) --- .../minio/minio-go/api-put-object-file.go | 121 +++++++++++---------- 1 file changed, 61 insertions(+), 60 deletions(-) (limited to 'vendor/github.com/minio/minio-go/api-put-object-file.go') diff --git a/vendor/github.com/minio/minio-go/api-put-object-file.go b/vendor/github.com/minio/minio-go/api-put-object-file.go index deaed0acd..aa554b321 100644 --- a/vendor/github.com/minio/minio-go/api-put-object-file.go +++ b/vendor/github.com/minio/minio-go/api-put-object-file.go @@ -28,6 +28,8 @@ import ( "os" "path/filepath" "sort" + + "github.com/minio/minio-go/pkg/s3utils" ) // FPutObject - Create an object in a bucket, with contents from file at filePath. @@ -62,6 +64,8 @@ func (c Client) FPutObject(bucketName, objectName, filePath, contentType string) return 0, ErrEntityTooLarge(fileSize, maxMultipartPutObjectSize, bucketName, objectName) } + objMetadata := make(map[string][]string) + // Set contentType based on filepath extension if not given or default // value of "binary/octet-stream" if the extension has no associated type. if contentType == "" { @@ -70,9 +74,11 @@ func (c Client) FPutObject(bucketName, objectName, filePath, contentType string) } } + objMetadata["Content-Type"] = []string{contentType} + // NOTE: Google Cloud Storage multipart Put is not compatible with Amazon S3 APIs. // Current implementation will only upload a maximum of 5GiB to Google Cloud Storage servers. - if isGoogleEndpoint(c.endpointURL) { + if s3utils.IsGoogleEndpoint(c.endpointURL) { if fileSize > int64(maxSinglePutObjectSize) { return 0, ErrorResponse{ Code: "NotImplemented", @@ -82,11 +88,11 @@ func (c Client) FPutObject(bucketName, objectName, filePath, contentType string) } } // Do not compute MD5 for Google Cloud Storage. Uploads up to 5GiB in size. - return c.putObjectNoChecksum(bucketName, objectName, fileReader, fileSize, contentType, nil) + return c.putObjectNoChecksum(bucketName, objectName, fileReader, fileSize, objMetadata, nil) } // NOTE: S3 doesn't allow anonymous multipart requests. - if isAmazonEndpoint(c.endpointURL) && c.anonymous { + if s3utils.IsAmazonEndpoint(c.endpointURL) && c.anonymous { if fileSize > int64(maxSinglePutObjectSize) { return 0, ErrorResponse{ Code: "NotImplemented", @@ -97,15 +103,15 @@ func (c Client) FPutObject(bucketName, objectName, filePath, contentType string) } // Do not compute MD5 for anonymous requests to Amazon // S3. Uploads up to 5GiB in size. - return c.putObjectNoChecksum(bucketName, objectName, fileReader, fileSize, contentType, nil) + return c.putObjectNoChecksum(bucketName, objectName, fileReader, fileSize, objMetadata, nil) } // Small object upload is initiated for uploads for input data size smaller than 5MiB. if fileSize < minPartSize && fileSize >= 0 { - return c.putObjectSingle(bucketName, objectName, fileReader, fileSize, contentType, nil) + return c.putObjectSingle(bucketName, objectName, fileReader, fileSize, objMetadata, nil) } // Upload all large objects as multipart. - n, err = c.putObjectMultipartFromFile(bucketName, objectName, fileReader, fileSize, contentType, nil) + n, err = c.putObjectMultipartFromFile(bucketName, objectName, fileReader, fileSize, objMetadata, nil) if err != nil { errResp := ToErrorResponse(err) // Verify if multipart functionality is not available, if not @@ -116,7 +122,7 @@ func (c Client) FPutObject(bucketName, objectName, filePath, contentType string) return 0, ErrEntityTooLarge(fileSize, maxSinglePutObjectSize, bucketName, objectName) } // Fall back to uploading as single PutObject operation. - return c.putObjectSingle(bucketName, objectName, fileReader, fileSize, contentType, nil) + return c.putObjectSingle(bucketName, objectName, fileReader, fileSize, objMetadata, nil) } return n, err } @@ -131,7 +137,7 @@ func (c Client) FPutObject(bucketName, objectName, filePath, contentType string) // against MD5SUM of each individual parts. This function also // effectively utilizes file system capabilities of reading from // specific sections and not having to create temporary files. -func (c Client) putObjectMultipartFromFile(bucketName, objectName string, fileReader io.ReaderAt, fileSize int64, contentType string, progress io.Reader) (int64, error) { +func (c Client) putObjectMultipartFromFile(bucketName, objectName string, fileReader io.ReaderAt, fileSize int64, metaData map[string][]string, progress io.Reader) (int64, error) { // Input validation. if err := isValidBucketName(bucketName); err != nil { return 0, err @@ -140,9 +146,8 @@ func (c Client) putObjectMultipartFromFile(bucketName, objectName string, fileRe return 0, err } - // Get upload id for an object, initiates a new multipart request - // if it cannot find any previously partially uploaded object. - uploadID, isNew, err := c.getUploadID(bucketName, objectName, contentType) + // Get the upload id of a previously partially uploaded object or initiate a new multipart upload + uploadID, partsInfo, err := c.getMpartUploadSession(bucketName, objectName, metaData) if err != nil { return 0, err } @@ -153,19 +158,6 @@ func (c Client) putObjectMultipartFromFile(bucketName, objectName string, fileRe // Complete multipart upload. var complMultipartUpload completeMultipartUpload - // A map of all uploaded parts. - var partsInfo = make(map[int]objectPart) - - // If this session is a continuation of a previous session fetch all - // previously uploaded parts info. - if !isNew { - // Fetch previously upload parts and maximum part size. - partsInfo, err = c.listObjectParts(bucketName, objectName, uploadID) - if err != nil { - return 0, err - } - } - // Calculate the optimal parts info for a given size. totalPartsCount, partSize, lastPartSize, err := optimalPartInfo(fileSize) if err != nil { @@ -178,14 +170,19 @@ func (c Client) putObjectMultipartFromFile(bucketName, objectName string, fileRe // Create a channel to communicate which part to upload. // Buffer this to 10000, the maximum number of parts allowed by S3. - uploadPartsCh := make(chan int, 10000) + uploadPartsCh := make(chan uploadPartReq, 10000) // Just for readability. lastPartNumber := totalPartsCount // Send each part through the partUploadCh to be uploaded. for p := 1; p <= totalPartsCount; p++ { - uploadPartsCh <- p + part, ok := partsInfo[p] + if ok { + uploadPartsCh <- uploadPartReq{PartNum: p, Part: &part} + } else { + uploadPartsCh <- uploadPartReq{PartNum: p, Part: nil} + } } close(uploadPartsCh) @@ -193,7 +190,7 @@ func (c Client) putObjectMultipartFromFile(bucketName, objectName string, fileRe for w := 1; w <= 3; w++ { go func() { // Deal with each part as it comes through the channel. - for partNumber := range uploadPartsCh { + for uploadReq := range uploadPartsCh { // Add hash algorithms that need to be calculated by computeHash() // In case of a non-v4 signature or https connection, sha256 is not needed. hashAlgos := make(map[string]hash.Hash) @@ -203,47 +200,50 @@ func (c Client) putObjectMultipartFromFile(bucketName, objectName string, fileRe hashAlgos["sha256"] = sha256.New() } + // If partNumber was not uploaded we calculate the missing + // part offset and size. For all other part numbers we + // calculate offset based on multiples of partSize. + readOffset := int64(uploadReq.PartNum-1) * partSize + missingPartSize := partSize + + // As a special case if partNumber is lastPartNumber, we + // calculate the offset based on the last part size. + if uploadReq.PartNum == lastPartNumber { + readOffset = (fileSize - lastPartSize) + missingPartSize = lastPartSize + } + + // Get a section reader on a particular offset. + sectionReader := io.NewSectionReader(fileReader, readOffset, missingPartSize) + var prtSize int64 + var err error + + prtSize, err = computeHash(hashAlgos, hashSums, sectionReader) + if err != nil { + uploadedPartsCh <- uploadedPartRes{ + Error: err, + } + // Exit the goroutine. + return + } + // Create the part to be uploaded. verifyObjPart := objectPart{ ETag: hex.EncodeToString(hashSums["md5"]), - PartNumber: partNumber, + PartNumber: uploadReq.PartNum, Size: partSize, } + // If this is the last part do not give it the full part size. - if partNumber == lastPartNumber { + if uploadReq.PartNum == lastPartNumber { verifyObjPart.Size = lastPartSize } // Verify if part should be uploaded. - if shouldUploadPart(verifyObjPart, partsInfo) { - // If partNumber was not uploaded we calculate the missing - // part offset and size. For all other part numbers we - // calculate offset based on multiples of partSize. - readOffset := int64(partNumber-1) * partSize - missingPartSize := partSize - - // As a special case if partNumber is lastPartNumber, we - // calculate the offset based on the last part size. - if partNumber == lastPartNumber { - readOffset = (fileSize - lastPartSize) - missingPartSize = lastPartSize - } - - // Get a section reader on a particular offset. - sectionReader := io.NewSectionReader(fileReader, readOffset, missingPartSize) - var prtSize int64 - prtSize, err = computeHash(hashAlgos, hashSums, sectionReader) - if err != nil { - uploadedPartsCh <- uploadedPartRes{ - Error: err, - } - // Exit the goroutine. - return - } - + if shouldUploadPart(verifyObjPart, uploadReq) { // Proceed to upload the part. var objPart objectPart - objPart, err = c.uploadPart(bucketName, objectName, uploadID, sectionReader, partNumber, hashSums["md5"], hashSums["sha256"], prtSize) + objPart, err = c.uploadPart(bucketName, objectName, uploadID, sectionReader, uploadReq.PartNum, hashSums["md5"], hashSums["sha256"], prtSize) if err != nil { uploadedPartsCh <- uploadedPartRes{ Error: err, @@ -252,12 +252,13 @@ func (c Client) putObjectMultipartFromFile(bucketName, objectName string, fileRe return } // Save successfully uploaded part metadata. - partsInfo[partNumber] = objPart + uploadReq.Part = &objPart } // Return through the channel the part size. uploadedPartsCh <- uploadedPartRes{ Size: verifyObjPart.Size, - PartNum: partNumber, + PartNum: uploadReq.PartNum, + Part: uploadReq.Part, Error: nil, } } @@ -271,8 +272,8 @@ func (c Client) putObjectMultipartFromFile(bucketName, objectName string, fileRe return totalUploadedSize, uploadRes.Error } // Retrieve each uploaded part and store it to be completed. - part, ok := partsInfo[uploadRes.PartNum] - if !ok { + part := uploadRes.Part + if part == nil { return totalUploadedSize, ErrInvalidArgument(fmt.Sprintf("Missing part number %d", uploadRes.PartNum)) } // Update the total uploaded size. -- cgit v1.2.3-1-g7c22