diff options
author | Christopher Speller <crspeller@gmail.com> | 2017-02-02 09:32:00 -0500 |
---|---|---|
committer | Harrison Healey <harrisonmhealey@gmail.com> | 2017-02-02 09:32:00 -0500 |
commit | 701d1ab638b23c24877fc41824add66232446676 (patch) | |
tree | ec120c88d38ac9d38d9eabdd3270b52bb6ac9d96 /vendor/github.com/minio/minio-go/api-put-object-readat.go | |
parent | ca3211bc04f6dea34e8168217182637d1419f998 (diff) | |
download | chat-701d1ab638b23c24877fc41824add66232446676.tar.gz chat-701d1ab638b23c24877fc41824add66232446676.tar.bz2 chat-701d1ab638b23c24877fc41824add66232446676.zip |
Updating server dependancies (#5249)
Diffstat (limited to 'vendor/github.com/minio/minio-go/api-put-object-readat.go')
-rw-r--r-- | vendor/github.com/minio/minio-go/api-put-object-readat.go | 135 |
1 files changed, 68 insertions, 67 deletions
diff --git a/vendor/github.com/minio/minio-go/api-put-object-readat.go b/vendor/github.com/minio/minio-go/api-put-object-readat.go index 14fa4b296..4ab1095f6 100644 --- a/vendor/github.com/minio/minio-go/api-put-object-readat.go +++ b/vendor/github.com/minio/minio-go/api-put-object-readat.go @@ -32,17 +32,22 @@ type uploadedPartRes struct { Error error // Any error encountered while uploading the part. PartNum int // Number of the part uploaded. Size int64 // Size of the part uploaded. + Part *objectPart +} + +type uploadPartReq struct { + PartNum int // Number of the part uploaded. + Part *objectPart // Size of the part uploaded. } // shouldUploadPartReadAt - verify if part should be uploaded. -func shouldUploadPartReadAt(objPart objectPart, objectParts map[int]objectPart) bool { +func shouldUploadPartReadAt(objPart objectPart, uploadReq uploadPartReq) bool { // If part not found part should be uploaded. - uploadedPart, found := objectParts[objPart.PartNumber] - if !found { + if uploadReq.Part == nil { return true } // if size mismatches part should be uploaded. - if uploadedPart.Size != objPart.Size { + if uploadReq.Part.Size != objPart.Size { return true } return false @@ -58,7 +63,7 @@ func shouldUploadPartReadAt(objPart objectPart, objectParts map[int]objectPart) // temporary files for staging all the data, these temporary files are // cleaned automatically when the caller i.e http client closes the // stream after uploading all the contents successfully. -func (c Client) putObjectMultipartFromReadAt(bucketName, objectName string, reader io.ReaderAt, size int64, contentType string, progress io.Reader) (n int64, err error) { +func (c Client) putObjectMultipartFromReadAt(bucketName, objectName string, reader io.ReaderAt, size int64, metaData map[string][]string, progress io.Reader) (n int64, err error) { // Input validation. if err := isValidBucketName(bucketName); err != nil { return 0, err @@ -67,9 +72,8 @@ func (c Client) putObjectMultipartFromReadAt(bucketName, objectName string, read return 0, err } - // Get upload id for an object, initiates a new multipart request - // if it cannot find any previously partially uploaded object. - uploadID, isNew, err := c.getUploadID(bucketName, objectName, contentType) + // Get the upload id of a previously partially uploaded object or initiate a new multipart upload + uploadID, partsInfo, err := c.getMpartUploadSession(bucketName, objectName, metaData) if err != nil { return 0, err } @@ -80,17 +84,6 @@ func (c Client) putObjectMultipartFromReadAt(bucketName, objectName string, read // Complete multipart upload. var complMultipartUpload completeMultipartUpload - // A map of all uploaded parts. - var partsInfo = make(map[int]objectPart) - - // Fetch all parts info previously uploaded. - if !isNew { - partsInfo, err = c.listObjectParts(bucketName, objectName, uploadID) - if err != nil { - return 0, err - } - } - // Calculate the optimal parts info for a given size. totalPartsCount, partSize, lastPartSize, err := optimalPartInfo(size) if err != nil { @@ -103,7 +96,7 @@ func (c Client) putObjectMultipartFromReadAt(bucketName, objectName string, read // Declare a channel that sends the next part number to be uploaded. // Buffered to 10000 because thats the maximum number of parts allowed // by S3. - uploadPartsCh := make(chan int, 10000) + uploadPartsCh := make(chan uploadPartReq, 10000) // Declare a channel that sends back the response of a part upload. // Buffered to 10000 because thats the maximum number of parts allowed @@ -112,7 +105,12 @@ func (c Client) putObjectMultipartFromReadAt(bucketName, objectName string, read // Send each part number to the channel to be processed. for p := 1; p <= totalPartsCount; p++ { - uploadPartsCh <- p + part, ok := partsInfo[p] + if ok { + uploadPartsCh <- uploadPartReq{PartNum: p, Part: &part} + } else { + uploadPartsCh <- uploadPartReq{PartNum: p, Part: nil} + } } close(uploadPartsCh) @@ -123,64 +121,65 @@ func (c Client) putObjectMultipartFromReadAt(bucketName, objectName string, read readAtBuffer := make([]byte, optimalReadBufferSize) // Each worker will draw from the part channel and upload in parallel. - for partNumber := range uploadPartsCh { + for uploadReq := range uploadPartsCh { // Declare a new tmpBuffer. tmpBuffer := new(bytes.Buffer) + // If partNumber was not uploaded we calculate the missing + // part offset and size. For all other part numbers we + // calculate offset based on multiples of partSize. + readOffset := int64(uploadReq.PartNum-1) * partSize + missingPartSize := partSize + + // As a special case if partNumber is lastPartNumber, we + // calculate the offset based on the last part size. + if uploadReq.PartNum == lastPartNumber { + readOffset = (size - lastPartSize) + missingPartSize = lastPartSize + } + + // Get a section reader on a particular offset. + sectionReader := io.NewSectionReader(reader, readOffset, missingPartSize) + + // Choose the needed hash algorithms to be calculated by hashCopyBuffer. + // Sha256 is avoided in non-v4 signature requests or HTTPS connections + hashSums := make(map[string][]byte) + hashAlgos := make(map[string]hash.Hash) + hashAlgos["md5"] = md5.New() + if c.signature.isV4() && !c.secure { + hashAlgos["sha256"] = sha256.New() + } + + var prtSize int64 + var err error + prtSize, err = hashCopyBuffer(hashAlgos, hashSums, tmpBuffer, sectionReader, readAtBuffer) + if err != nil { + // Send the error back through the channel. + uploadedPartsCh <- uploadedPartRes{ + Size: 0, + Error: err, + } + // Exit the goroutine. + return + } + // Verify object if its uploaded. verifyObjPart := objectPart{ - PartNumber: partNumber, + PartNumber: uploadReq.PartNum, Size: partSize, } // Special case if we see a last part number, save last part // size as the proper part size. - if partNumber == lastPartNumber { + if uploadReq.PartNum == lastPartNumber { verifyObjPart.Size = lastPartSize } // Only upload the necessary parts. Otherwise return size through channel // to update any progress bar. - if shouldUploadPartReadAt(verifyObjPart, partsInfo) { - // If partNumber was not uploaded we calculate the missing - // part offset and size. For all other part numbers we - // calculate offset based on multiples of partSize. - readOffset := int64(partNumber-1) * partSize - missingPartSize := partSize - - // As a special case if partNumber is lastPartNumber, we - // calculate the offset based on the last part size. - if partNumber == lastPartNumber { - readOffset = (size - lastPartSize) - missingPartSize = lastPartSize - } - - // Get a section reader on a particular offset. - sectionReader := io.NewSectionReader(reader, readOffset, missingPartSize) - - // Choose the needed hash algorithms to be calculated by hashCopyBuffer. - // Sha256 is avoided in non-v4 signature requests or HTTPS connections - hashSums := make(map[string][]byte) - hashAlgos := make(map[string]hash.Hash) - hashAlgos["md5"] = md5.New() - if c.signature.isV4() && !c.secure { - hashAlgos["sha256"] = sha256.New() - } - - var prtSize int64 - prtSize, err = hashCopyBuffer(hashAlgos, hashSums, tmpBuffer, sectionReader, readAtBuffer) - if err != nil { - // Send the error back through the channel. - uploadedPartsCh <- uploadedPartRes{ - Size: 0, - Error: err, - } - // Exit the goroutine. - return - } - + if shouldUploadPartReadAt(verifyObjPart, uploadReq) { // Proceed to upload the part. var objPart objectPart - objPart, err = c.uploadPart(bucketName, objectName, uploadID, tmpBuffer, partNumber, hashSums["md5"], hashSums["sha256"], prtSize) + objPart, err = c.uploadPart(bucketName, objectName, uploadID, tmpBuffer, uploadReq.PartNum, hashSums["md5"], hashSums["sha256"], prtSize) if err != nil { uploadedPartsCh <- uploadedPartRes{ Size: 0, @@ -190,12 +189,13 @@ func (c Client) putObjectMultipartFromReadAt(bucketName, objectName string, read return } // Save successfully uploaded part metadata. - partsInfo[partNumber] = objPart + uploadReq.Part = &objPart } // Send successful part info through the channel. uploadedPartsCh <- uploadedPartRes{ Size: verifyObjPart.Size, - PartNum: partNumber, + PartNum: uploadReq.PartNum, + Part: uploadReq.Part, Error: nil, } } @@ -210,8 +210,9 @@ func (c Client) putObjectMultipartFromReadAt(bucketName, objectName string, read return totalUploadedSize, uploadRes.Error } // Retrieve each uploaded part and store it to be completed. - part, ok := partsInfo[uploadRes.PartNum] - if !ok { + // part, ok := partsInfo[uploadRes.PartNum] + part := uploadRes.Part + if part == nil { return 0, ErrInvalidArgument(fmt.Sprintf("Missing part number %d", uploadRes.PartNum)) } // Update the totalUploadedSize. |