Skip to content

Commit

Permalink
Google Cloud Storage leverage CustomTime and Object Lifecycle Rules a…
Browse files Browse the repository at this point in the history
…nd implementation of StoreCleaner interface (#20)
  • Loading branch information
marino39 authored Oct 9, 2024
1 parent 75090a8 commit 46a2f51
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 17 deletions.
6 changes: 6 additions & 0 deletions cachestore.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ type Store[V any] interface {
GetOrSetWithLockEx(ctx context.Context, key string, getter func(context.Context, string) (V, error), ttl time.Duration) (V, error)
}

type StoreCleaner interface {
// CleanExpiredEvery cleans expired keys every d duration.
// If onError is not nil, it will be called when an error occurs.
CleanExpiredEvery(ctx context.Context, d time.Duration, onError func(err error))
}

type Backend interface {
Apply(*StoreOptions)
}
99 changes: 82 additions & 17 deletions gcstorage/gcstorage.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package gcstorage

import (
"cmp"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"strings"
"time"

"cloud.google.com/go/storage"
Expand All @@ -17,12 +17,15 @@ import (

const DefaultTTL = time.Second * 24 * 60 * 60 // 1 day in seconds

const keySuffix = ".cachestore"

type cacheObject[T any] struct {
Object T `json:"object"`
ExpiresAt time.Time `json:"expires_at"`
}

var _ cachestore.Store[any] = &GCStorage[any]{}
var _ cachestore.StoreCleaner = &GCStorage[any]{}

type GCStorage[V any] struct {
keyPrefix string
Expand Down Expand Up @@ -50,6 +53,20 @@ func NewWithBackend[V any](backend cachestore.Backend, opts ...cachestore.StoreO
return New[V](cfg, cfg.StoreOptions)
}

// New creates a new GCStorage instance.
//
// The GCStorage instance is a cachestore.Store implementation that uses Google Cloud Storage as the backend. The object
// is serialized to JSON and stored in the bucket. The key is prefixed with the keyPrefix and the object is stored in the
// bucket with the keyPrefix + key + `.cachestore`. The object is stored with a custom time of the expiry time.
//
// Please note that: The Google Cloud Storage bucket must have proper Object Lifecycle Management rules to delete
// the objects after expiry automatically. In case the bucket does not have Object Lifecycle Management rules, the
// objects will not be deleted. In that case the objects will be deleted only if CleanExpiredEvery is working in the
// background.
//
// Required Lifecycle Management Rule for automatic deletion:
// 1. Delete object 0+ days since object's custom time
// 2. Name matches suffix '.cachestore'
func New[V any](cfg *Config, opts ...cachestore.StoreOptions) (cachestore.Store[V], error) {
for _, opt := range opts {
opt.Apply(&cfg.StoreOptions)
Expand All @@ -72,29 +89,22 @@ func New[V any](cfg *Config, opts ...cachestore.StoreOptions) (cachestore.Store[
return &GCStorage[V]{
keyPrefix: cfg.KeyPrefix,
defaultKeyExpiry: cfg.DefaultKeyExpiry,

client: client,
bucketHandle: client.Bucket(cfg.Bucket),
}, nil
}

func (g *GCStorage[V]) Exists(ctx context.Context, key string) (bool, error) {
attr, err := g.bucketHandle.Object(g.keyPrefix + key).Attrs(ctx)
attr, err := g.bucketHandle.Object(g.objectKey(key)).Attrs(ctx)
if err != nil {
if errors.Is(err, storage.ErrObjectNotExist) {
return false, nil
}
return false, fmt.Errorf("cachestore/gcstorage: get attrs returned error: %w", err)
}

if attr.Metadata["expires_at"] != "" {
expiresAt, err := time.Parse(time.RFC3339, attr.Metadata["expires_at"])
if err != nil {
return false, fmt.Errorf("cachestore/gcstorage: time parse returned error: %w", err)
}
if !expiresAt.IsZero() && expiresAt.Before(time.Now()) {
return false, nil
}
if !attr.CustomTime.IsZero() && attr.CustomTime.Before(time.Now()) {
return false, nil
}
return true, nil
}
Expand Down Expand Up @@ -124,12 +134,11 @@ func (g *GCStorage[V]) SetEx(ctx context.Context, key string, value V, ttl time.
return err
}

obj := g.bucketHandle.Object(g.keyPrefix + key)
obj := g.bucketHandle.Object(g.objectKey(key))
w := obj.NewWriter(ctx)
w.ObjectAttrs.ContentType = "application/json"
w.ObjectAttrs.Metadata = map[string]string{
"expires_at": expiresAt.Format(time.RFC3339),
}
w.ObjectAttrs.CustomTime = expiresAt

if _, err := w.Write(data); err != nil {
_ = w.Close()
return fmt.Errorf("cachestore/gcstorage: write returned error: %w", err)
Expand All @@ -146,7 +155,7 @@ func (g *GCStorage[V]) Get(ctx context.Context, key string) (V, bool, error) {
}

func (g *GCStorage[V]) GetEx(ctx context.Context, key string) (V, *time.Duration, bool, error) {
obj := g.bucketHandle.Object(g.keyPrefix + key)
obj := g.bucketHandle.Object(g.objectKey(key))
r, err := obj.NewReader(ctx)
if err != nil {
if errors.Is(err, storage.ErrObjectNotExist) {
Expand Down Expand Up @@ -204,7 +213,7 @@ func (g *GCStorage[V]) BatchGet(ctx context.Context, keys []string) ([]V, []bool
}

func (g *GCStorage[V]) Delete(ctx context.Context, key string) error {
return g.bucketHandle.Object(g.keyPrefix + key).Delete(ctx)
return g.bucketHandle.Object(g.objectKey(key)).Delete(ctx)
}

func (g *GCStorage[V]) DeletePrefix(ctx context.Context, keyPrefix string) error {
Expand All @@ -228,6 +237,10 @@ func (g *GCStorage[V]) DeletePrefix(ctx context.Context, keyPrefix string) error
return fmt.Errorf("cachestore/gcstorage: it next error: %w", err)
}

if !strings.HasSuffix(objAttrs.Name, keySuffix) {
continue
}

if err = g.bucketHandle.Object(objAttrs.Name).Delete(ctx); err != nil {
return fmt.Errorf("cachestore/gcstorage: object delete error: %w", err)
}
Expand All @@ -250,6 +263,10 @@ func (g *GCStorage[V]) ClearAll(ctx context.Context) error {
return fmt.Errorf("cachestore/gcstorage: it next error: %w", err)
}

if !strings.HasSuffix(objAttrs.Name, keySuffix) {
continue
}

if err = g.bucketHandle.Object(objAttrs.Name).Delete(ctx); err != nil {
return fmt.Errorf("cachestore/gcstorage: object delete error: %w", err)
}
Expand All @@ -265,6 +282,53 @@ func (g *GCStorage[V]) GetOrSetWithLockEx(ctx context.Context, key string, gette
return *new(V), cachestore.ErrNotSupported
}

func (g *GCStorage[V]) CleanExpiredEvery(ctx context.Context, d time.Duration, onError func(err error)) {
ticker := time.NewTicker(d)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
objIt := g.bucketHandle.Objects(ctx, &storage.Query{
Prefix: g.keyPrefix,
})

for {
objAttrs, err := objIt.Next()
if err != nil {
if errors.Is(err, iterator.Done) {
break
}
return
}

if !strings.HasSuffix(objAttrs.Name, keySuffix) {
continue
}

if objAttrs.CustomTime.IsZero() {
continue
}

if objAttrs.CustomTime.Before(time.Now()) {
if err = g.bucketHandle.Object(objAttrs.Name).Delete(ctx); err != nil {
if onError != nil {
onError(fmt.Errorf("cachestore/gcstorage: delete error: %w", err))
}
continue
}
}
}
}
}
}

func (g *GCStorage[V]) objectKey(key string) string {
return fmt.Sprintf("%s%s%s", g.keyPrefix, key, keySuffix)
}

func serialize[V any](value V) ([]byte, error) {
return json.Marshal(value)
}
Expand All @@ -276,4 +340,5 @@ func deserialize[V any](data []byte) (V, error) {
}

return out, nil

}
48 changes: 48 additions & 0 deletions gcstorage/gcstorage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"testing"
"time"

"cloud.google.com/go/storage"
"github.com/goware/cachestore"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -101,6 +103,23 @@ func TestGCStorage(t *testing.T) {
require.False(t, ok)
})

t.Run("Exists_Expiry", func(t *testing.T) {
ctx := context.Background()

err = store.SetEx(ctx, "foo", "bar", 1*time.Second)
require.NoError(t, err)

ok, err := store.Exists(ctx, "foo")
require.NoError(t, err)
require.True(t, ok)

time.Sleep(2 * time.Second)

ok, err = store.Exists(ctx, "foo")
require.NoError(t, err)
require.False(t, ok)
})

t.Run("Delete", func(t *testing.T) {
ctx := context.Background()

Expand Down Expand Up @@ -181,4 +200,33 @@ func TestGCStorage(t *testing.T) {
require.Equal(t, []string{"", "", ""}, values)
require.Equal(t, []bool{false, false, false}, exists)
})

t.Run("CleanExpiredEvery", func(t *testing.T) {
ctx := context.Background()

// Set a key with 1 second expiry
err = store.SetEx(ctx, "foo", "bar", 1*time.Second)
require.NoError(t, err)

time.Sleep(2 * time.Second)

// Start the cleaner
cCtx, cancel := context.WithCancel(ctx)
defer cancel()

storeCleaner, ok := store.(cachestore.StoreCleaner)
require.True(t, ok)

go storeCleaner.CleanExpiredEvery(cCtx, 1*time.Second, nil)

// Wait for cleaner to clean the expired key
time.Sleep(2 * time.Second)

// Check if the key is deleted
gcsClient, err := storage.NewClient(ctx)
require.NoError(t, err)

_, err = gcsClient.Bucket("my-bucket").Object("test/foo.cachestore").Attrs(ctx)
require.ErrorIs(t, err, storage.ErrObjectNotExist)
})
}

0 comments on commit 46a2f51

Please sign in to comment.