From ba274acc938dd76a4bed20316ee13af5dfb96379 Mon Sep 17 00:00:00 2001 From: ytimocin Date: Wed, 23 Oct 2024 18:19:17 -0700 Subject: [PATCH] Adding copy operation to Azure Blob Storage and AWS S3 bindings Signed-off-by: ytimocin --- bindings/aws/s3/metadata.yaml | 2 + bindings/aws/s3/s3.go | 54 +++++++++- bindings/azure/blobstorage/blobstorage.go | 118 +++++++++++++++++++++- bindings/azure/blobstorage/metadata.yaml | 2 + bindings/gcp/bucket/bucket.go | 34 ++++++- bindings/gcp/bucket/bucket_test.go | 8 +- bindings/requests.go | 1 + 7 files changed, 206 insertions(+), 13 deletions(-) diff --git a/bindings/aws/s3/metadata.yaml b/bindings/aws/s3/metadata.yaml index e33fcbad3c..b18720a8db 100644 --- a/bindings/aws/s3/metadata.yaml +++ b/bindings/aws/s3/metadata.yaml @@ -19,6 +19,8 @@ binding: description: "Delete blob" - name: list description: "List blob" + - name: copy + description: "Copy blob" capabilities: [] builtinAuthenticationProfiles: - name: "aws" diff --git a/bindings/aws/s3/s3.go b/bindings/aws/s3/s3.go index cc67cec94f..ea40fa47b0 100644 --- a/bindings/aws/s3/s3.go +++ b/bindings/aws/s3/s3.go @@ -157,6 +157,7 @@ func (s *AWSS3) Operations() []bindings.OperationKind { bindings.GetOperation, bindings.DeleteOperation, bindings.ListOperation, + bindings.CopyOperation, presignOperation, } } @@ -240,7 +241,7 @@ func (s *AWSS3) create(ctx context.Context, req *bindings.InvokeRequest) (*bindi }, nil } -func (s *AWSS3) presign(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { +func (s *AWSS3) presign(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { metadata, err := s.metadata.mergeWithRequestMetadata(req) if err != nil { return nil, fmt.Errorf("s3 binding error: error merging metadata: %w", err) @@ -389,6 +390,53 @@ func (s *AWSS3) list(ctx context.Context, req *bindings.InvokeRequest) (*binding }, nil } +func (s *AWSS3) copy(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { + _, err := s.metadata.mergeWithRequestMetadata(req) + if err != nil { + return nil, fmt.Errorf("s3 binding error: error merging metadata: %w", err) + } + + source := req.Metadata["source"] + if source == "" { + return nil, fmt.Errorf("s3 binding error: required metadata 'source' missing") + } + + destinationBucket := req.Metadata["bucket"] + if destinationBucket == "" { + return nil, fmt.Errorf("s3 binding error: required metadata 'bucket' missing") + } + + destinationKey := req.Metadata["destinationKey"] + if destinationKey == "" { + return nil, fmt.Errorf("s3 binding error: required metadata 'destinationKey' missing") + } + + _, err = s.s3Client.CopyObject(&s3.CopyObjectInput{ + // Bucket is the destination bucket. + Bucket: ptr.Of(destinationBucket), + + // CopySource is the source bucket and key. + CopySource: ptr.Of(source), + + // Key is the key of the destination object. + Key: ptr.Of(destinationKey), + + // MetadataDirective is the directive to apply to the metadata of the destination object. + MetadataDirective: ptr.Of(s3.MetadataDirectiveCopy), + }) + if err != nil { + return nil, fmt.Errorf("s3 binding error: copy operation failed: %w", err) + } + + return &bindings.InvokeResponse{ + Metadata: map[string]string{ + "source": source, + "destinationBucket": destinationBucket, + "destinationKey": destinationKey, + }, + }, nil +} + func (s *AWSS3) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { switch req.Operation { case bindings.CreateOperation: @@ -399,8 +447,10 @@ func (s *AWSS3) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bindi return s.delete(ctx, req) case bindings.ListOperation: return s.list(ctx, req) + case bindings.CopyOperation: + return s.copy(req) case presignOperation: - return s.presign(ctx, req) + return s.presign(req) default: return nil, fmt.Errorf("s3 binding error: unsupported operation %s", req.Operation) } diff --git a/bindings/azure/blobstorage/blobstorage.go b/bindings/azure/blobstorage/blobstorage.go index b132f9bf4a..033deaa91d 100644 --- a/bindings/azure/blobstorage/blobstorage.go +++ b/bindings/azure/blobstorage/blobstorage.go @@ -22,17 +22,19 @@ import ( "io" "reflect" "strconv" + "time" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob" "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/sas" "github.com/google/uuid" "github.com/dapr/components-contrib/bindings" storagecommon "github.com/dapr/components-contrib/common/component/azure/blobstorage" - contribMetadata "github.com/dapr/components-contrib/metadata" + contribmetadata "github.com/dapr/components-contrib/metadata" "github.com/dapr/kit/logger" "github.com/dapr/kit/ptr" ) @@ -57,8 +59,9 @@ const ( // Specifies the maximum number of blobs to return, including all BlobPrefix elements. If the request does not // specify maxresults the server will return up to 5,000 items. // See: https://docs.microsoft.com/en-us/rest/api/storageservices/list-blobs#uri-parameters - maxResults int32 = 5000 - endpointKey = "endpoint" + maxResults int32 = 5000 + endpointKey = "endpoint" + presignOperation = "presign" ) var ErrMissingBlobName = errors.New("blobName is a required attribute") @@ -76,6 +79,10 @@ type createResponse struct { BlobName string `json:"blobName"` } +type presignResponse struct { + PresignURL string `json:"presignURL"` +} + type listInclude struct { Copy bool `json:"copy"` Metadata bool `json:"metadata"` @@ -112,6 +119,7 @@ func (a *AzureBlobStorage) Operations() []bindings.OperationKind { bindings.GetOperation, bindings.DeleteOperation, bindings.ListOperation, + bindings.CopyOperation, } } @@ -344,6 +352,102 @@ func (a *AzureBlobStorage) list(ctx context.Context, req *bindings.InvokeRequest }, nil } +func (a *AzureBlobStorage) copy(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { + sourceBlobName := req.Metadata["sourceBlobName"] + if sourceBlobName == "" { + return nil, fmt.Errorf("azure blob storage error: required metadata 'sourceBlobName' missing") + } + + destinationBlobName := req.Metadata["destinationBlobName"] + if destinationBlobName == "" { + return nil, fmt.Errorf("azure blob storage error: required metadata 'destinationBlobName' missing") + } + + sourceContainerName := req.Metadata["sourceContainerName"] + if sourceContainerName == "" { + return nil, fmt.Errorf("azure blob storage error: required metadata 'sourceContainerName' missing") + } + + destinationContainerName := req.Metadata["destinationContainerName"] + if destinationContainerName == "" { + return nil, fmt.Errorf("azure blob storage error: required metadata 'destinationContainerName' missing") + } + + sourceBlobClient := a.containerClient.NewBlockBlobClient(fmt.Sprintf("%s/%s", sourceContainerName, sourceBlobName)) + destinationBlobClient := a.containerClient.NewBlockBlobClient(fmt.Sprintf("%s/%s", destinationContainerName, destinationBlobName)) + + copyURL := sourceBlobClient.URL() + _, err := destinationBlobClient.StartCopyFromURL(ctx, copyURL, nil) + if err != nil { + return nil, fmt.Errorf("azure blob storage error: copy operation failed: %w", err) + } + + return &bindings.InvokeResponse{ + Metadata: map[string]string{ + "sourceBlobName": sourceBlobName, + "destinationBlobName": destinationBlobName, + }, + }, nil +} + +func (a *AzureBlobStorage) presign(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { + blobName := req.Metadata[metadataKeyBlobName] + if blobName == "" { + return nil, fmt.Errorf("azure blob storage error: required metadata '%s' missing", metadataKeyBlobName) + } + + presignTTL := req.Metadata["presignTTL"] + if presignTTL == "" { + return nil, fmt.Errorf("azure blob storage error: required metadata 'presignTTL' missing") + } + + ttl, err := time.ParseDuration(presignTTL) + if err != nil { + return nil, fmt.Errorf("azure blob storage error: cannot parse duration %s: %w", presignTTL, err) + } + + blobClient := a.containerClient.NewBlockBlobClient(blobName) + sasURL, err := a.generateSASURL(blobClient, ttl) + if err != nil { + return nil, fmt.Errorf("azure blob storage error: %w", err) + } + + jsonResponse, err := json.Marshal(presignResponse{ + PresignURL: sasURL, + }) + if err != nil { + return nil, fmt.Errorf("s3 binding error: error marshalling presign response: %w", err) + } + + return &bindings.InvokeResponse{ + Data: jsonResponse, + }, nil +} + +func (a *AzureBlobStorage) generateSASURL(blobClient *blockblob.Client, ttl time.Duration) (string, error) { + permissions := sas.AccountPermissions{ + Read: true, + } + + sasValues := sas.AccountSignatureValues{ + Protocol: sas.ProtocolHTTPS, + ExpiryTime: time.Now().UTC().Add(ttl), + Permissions: permissions.String(), + } + + credential, err := azblob.NewSharedKeyCredential(a.metadata.AccountName, a.metadata.AccountKey) + if err != nil { + return "", fmt.Errorf("error creating shared key credential: %w", err) + } + + sasQueryParams, err := sasValues.SignWithSharedKey(credential) + if err != nil { + return "", fmt.Errorf("error generating SAS query parameters: %w", err) + } + + return fmt.Sprintf("%s?%s", blobClient.URL(), sasQueryParams.Encode()), nil +} + func (a *AzureBlobStorage) Invoke(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { switch req.Operation { case bindings.CreateOperation: @@ -354,6 +458,10 @@ func (a *AzureBlobStorage) Invoke(ctx context.Context, req *bindings.InvokeReque return a.delete(ctx, req) case bindings.ListOperation: return a.list(ctx, req) + case bindings.CopyOperation: + return a.copy(ctx, req) + case presignOperation: + return a.presign(req) default: return nil, fmt.Errorf("unsupported operation %s", req.Operation) } @@ -371,9 +479,9 @@ func (a *AzureBlobStorage) isValidDeleteSnapshotsOptionType(accessType azblob.De } // GetComponentMetadata returns the metadata of the component. -func (a *AzureBlobStorage) GetComponentMetadata() (metadataInfo contribMetadata.MetadataMap) { +func (a *AzureBlobStorage) GetComponentMetadata() (metadataInfo contribmetadata.MetadataMap) { metadataStruct := storagecommon.BlobStorageMetadata{} - contribMetadata.GetMetadataInfoFromStructType(reflect.TypeOf(metadataStruct), &metadataInfo, contribMetadata.BindingType) + contribmetadata.GetMetadataInfoFromStructType(reflect.TypeOf(metadataStruct), &metadataInfo, contribmetadata.BindingType) return } diff --git a/bindings/azure/blobstorage/metadata.yaml b/bindings/azure/blobstorage/metadata.yaml index 0c436bb02e..ec28df2738 100644 --- a/bindings/azure/blobstorage/metadata.yaml +++ b/bindings/azure/blobstorage/metadata.yaml @@ -19,6 +19,8 @@ binding: description: "Delete blob" - name: list description: "List blob" + - name: copy + description: "Copy blob" capabilities: [] builtinAuthenticationProfiles: - name: "azuread" diff --git a/bindings/gcp/bucket/bucket.go b/bindings/gcp/bucket/bucket.go index 2a8b5e1faa..b3ecfcd19c 100644 --- a/bindings/gcp/bucket/bucket.go +++ b/bindings/gcp/bucket/bucket.go @@ -137,6 +137,7 @@ func (g *GCPStorage) Operations() []bindings.OperationKind { bindings.GetOperation, bindings.DeleteOperation, bindings.ListOperation, + bindings.CopyOperation, signOperation, } } @@ -153,8 +154,10 @@ func (g *GCPStorage) Invoke(ctx context.Context, req *bindings.InvokeRequest) (* return g.delete(ctx, req) case bindings.ListOperation: return g.list(ctx, req) + case bindings.CopyOperation: + return g.copy(ctx, req) case signOperation: - return g.sign(ctx, req) + return g.sign(req) default: return nil, fmt.Errorf("unsupported operation %s", req.Operation) } @@ -307,6 +310,33 @@ func (g *GCPStorage) list(ctx context.Context, req *bindings.InvokeRequest) (*bi }, nil } +func (g *GCPStorage) copy(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { + sourceKey := req.Metadata["sourceKey"] + if sourceKey == "" { + return nil, fmt.Errorf("gcp bucket binding error: required metadata 'sourceKey' missing") + } + + destinationKey := req.Metadata["destinationKey"] + if destinationKey == "" { + return nil, fmt.Errorf("gcp bucket binding error: required metadata 'destinationKey' missing") + } + + src := g.client.Bucket(g.metadata.Bucket).Object(sourceKey) + dst := g.client.Bucket(g.metadata.Bucket).Object(destinationKey) + + _, err := dst.CopierFrom(src).Run(ctx) + if err != nil { + return nil, fmt.Errorf("gcp bucket binding error: copy operation failed: %w", err) + } + + return &bindings.InvokeResponse{ + Metadata: map[string]string{ + "sourceKey": sourceKey, + "destinationKey": destinationKey, + }, + }, nil +} + func (g *GCPStorage) Close() error { return g.client.Close() } @@ -345,7 +375,7 @@ func (g *GCPStorage) GetComponentMetadata() (metadataInfo metadata.MetadataMap) return } -func (g *GCPStorage) sign(ctx context.Context, req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { +func (g *GCPStorage) sign(req *bindings.InvokeRequest) (*bindings.InvokeResponse, error) { metadata, err := g.metadata.mergeWithRequestMetadata(req) if err != nil { return nil, fmt.Errorf("gcp binding error. error merge metadata : %w", err) diff --git a/bindings/gcp/bucket/bucket_test.go b/bindings/gcp/bucket/bucket_test.go index 6922050acb..098d463f0a 100644 --- a/bindings/gcp/bucket/bucket_test.go +++ b/bindings/gcp/bucket/bucket_test.go @@ -76,10 +76,10 @@ func TestParseMetadata(t *testing.T) { t.Run("check backward compatibility", func(t *testing.T) { gs := GCPStorage{logger: logger.NewLogger("test")} - request := bindings.InvokeRequest{} - request.Operation = bindings.CreateOperation - request.Metadata = map[string]string{ - "name": "my_file.txt", + request := bindings.InvokeRequest{ + Metadata: map[string]string{ + "name": "my_file.txt", + }, } result := gs.handleBackwardCompatibilityForMetadata(request.Metadata) assert.NotEmpty(t, result["key"]) diff --git a/bindings/requests.go b/bindings/requests.go index 385ae4ecc6..9d4ea2bbcb 100644 --- a/bindings/requests.go +++ b/bindings/requests.go @@ -34,6 +34,7 @@ const ( CreateOperation OperationKind = "create" DeleteOperation OperationKind = "delete" ListOperation OperationKind = "list" + CopyOperation OperationKind = "copy" ) // GetMetadataAsBool parses metadata as bool.