gotosocial/vendor/github.com/minio/minio-go/v7/api-putobject-snowball.go

247 lines
6.0 KiB
Go

/*
* MinIO Go Library for Amazon S3 Compatible Cloud Storage
* Copyright 2021 MinIO, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package minio
import (
"archive/tar"
"bufio"
"bytes"
"context"
"fmt"
"io"
"net/http"
"os"
"strings"
"sync"
"time"
"github.com/klauspost/compress/s2"
)
// SnowballOptions contains options for PutObjectsSnowball calls.
type SnowballOptions struct {
// Opts is options applied to all objects.
Opts PutObjectOptions
// Processing options:
// InMemory specifies that all objects should be collected in memory
// before they are uploaded.
// If false a temporary file will be created.
InMemory bool
// Compress enabled content compression before upload.
// Compression will typically reduce memory and network usage,
// Compression can safely be enabled with MinIO hosts.
Compress bool
// SkipErrs if enabled will skip any errors while reading the
// object content while creating the snowball archive
SkipErrs bool
}
// SnowballObject contains information about a single object to be added to the snowball.
type SnowballObject struct {
// Key is the destination key, including prefix.
Key string
// Size is the content size of this object.
Size int64
// Modtime to apply to the object.
// If Modtime is the zero value current time will be used.
ModTime time.Time
// Content of the object.
// Exactly 'Size' number of bytes must be provided.
Content io.Reader
// VersionID of the object; if empty, a new versionID will be generated
VersionID string
// Headers contains more options for this object upload, the same as you
// would include in a regular PutObject operation, such as user metadata
// and content-disposition, expires, ..
Headers http.Header
// Close will be called when an object has finished processing.
// Note that if PutObjectsSnowball returns because of an error,
// objects not consumed from the input will NOT have been closed.
// Leave as nil for no callback.
Close func()
}
type nopReadSeekCloser struct {
io.ReadSeeker
}
func (n nopReadSeekCloser) Close() error {
return nil
}
// This is available as io.ReadSeekCloser from go1.16
type readSeekCloser interface {
io.Reader
io.Closer
io.Seeker
}
// PutObjectsSnowball will put multiple objects with a single put call.
// A (compressed) TAR file will be created which will contain multiple objects.
// The key for each object will be used for the destination in the specified bucket.
// Total size should be < 5TB.
// This function blocks until 'objs' is closed and the content has been uploaded.
func (c Client) PutObjectsSnowball(ctx context.Context, bucketName string, opts SnowballOptions, objs <-chan SnowballObject) (err error) {
err = opts.Opts.validate()
if err != nil {
return err
}
var tmpWriter io.Writer
var getTmpReader func() (rc readSeekCloser, sz int64, err error)
if opts.InMemory {
b := bytes.NewBuffer(nil)
tmpWriter = b
getTmpReader = func() (readSeekCloser, int64, error) {
return nopReadSeekCloser{bytes.NewReader(b.Bytes())}, int64(b.Len()), nil
}
} else {
f, err := os.CreateTemp("", "s3-putsnowballobjects-*")
if err != nil {
return err
}
name := f.Name()
tmpWriter = f
var once sync.Once
defer once.Do(func() {
f.Close()
})
defer os.Remove(name)
getTmpReader = func() (readSeekCloser, int64, error) {
once.Do(func() {
f.Close()
})
f, err := os.Open(name)
if err != nil {
return nil, 0, err
}
st, err := f.Stat()
if err != nil {
return nil, 0, err
}
return f, st.Size(), nil
}
}
flush := func() error { return nil }
if !opts.Compress {
if !opts.InMemory {
// Insert buffer for writes.
buf := bufio.NewWriterSize(tmpWriter, 1<<20)
flush = buf.Flush
tmpWriter = buf
}
} else {
s2c := s2.NewWriter(tmpWriter, s2.WriterBetterCompression())
flush = s2c.Close
defer s2c.Close()
tmpWriter = s2c
}
t := tar.NewWriter(tmpWriter)
objectLoop:
for {
select {
case <-ctx.Done():
return ctx.Err()
case obj, ok := <-objs:
if !ok {
break objectLoop
}
closeObj := func() {}
if obj.Close != nil {
closeObj = obj.Close
}
// Trim accidental slash prefix.
obj.Key = strings.TrimPrefix(obj.Key, "/")
header := tar.Header{
Typeflag: tar.TypeReg,
Name: obj.Key,
Size: obj.Size,
ModTime: obj.ModTime,
Format: tar.FormatPAX,
}
if header.ModTime.IsZero() {
header.ModTime = time.Now().UTC()
}
header.PAXRecords = make(map[string]string)
if obj.VersionID != "" {
header.PAXRecords["minio.versionId"] = obj.VersionID
}
for k, vals := range obj.Headers {
header.PAXRecords["minio.metadata."+k] = strings.Join(vals, ",")
}
if err := t.WriteHeader(&header); err != nil {
closeObj()
return err
}
n, err := io.Copy(t, obj.Content)
if err != nil {
closeObj()
if opts.SkipErrs {
continue
}
return err
}
if n != obj.Size {
closeObj()
if opts.SkipErrs {
continue
}
return io.ErrUnexpectedEOF
}
closeObj()
}
}
// Flush tar
err = t.Flush()
if err != nil {
return err
}
// Flush compression
err = flush()
if err != nil {
return err
}
if opts.Opts.UserMetadata == nil {
opts.Opts.UserMetadata = map[string]string{}
}
opts.Opts.UserMetadata["X-Amz-Meta-Snowball-Auto-Extract"] = "true"
opts.Opts.DisableMultipart = true
rc, sz, err := getTmpReader()
if err != nil {
return err
}
defer rc.Close()
rand := c.random.Uint64()
_, err = c.PutObject(ctx, bucketName, fmt.Sprintf("snowball-upload-%x.tar", rand), rc, sz, opts.Opts)
return err
}