Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Cm async poc #19

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
module github.com/ThreeDotsLabs/watermill-kafka/v2
module weavelab.xyz/watermill-kafka

require (
github.com/Shopify/sarama v1.32.0
Expand Down
121 changes: 121 additions & 0 deletions pkg/kafka/publisher_async.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package kafka

import (
"github.com/Shopify/sarama"
"github.com/pkg/errors"
"go.opentelemetry.io/contrib/instrumentation/github.com/Shopify/sarama/otelsarama"
"time"

"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill/message"
)

type PublisherAsync struct {
config PublisherConfig
producer sarama.AsyncProducer
logger watermill.LoggerAdapter
errorsChan <-chan *sarama.ProducerError
successesChan <-chan *sarama.ProducerMessage

closed bool
}

// NewAsyncPublisher creates a new Kafka PublisherAsync.
func NewAsyncPublisher(
config PublisherConfig,
logger watermill.LoggerAdapter,
) (*PublisherAsync, error) {
config.setAsyncDefaults()

if err := config.Validate(); err != nil {
return nil, err
}

if logger == nil {
logger = watermill.NopLogger{}
}

producer, err := sarama.NewAsyncProducer(config.Brokers, config.OverwriteSaramaConfig)
if err != nil {
return nil, errors.Wrap(err, "cannot create Kafka producer")
}

if config.OTELEnabled {
producer = otelsarama.WrapAsyncProducer(config.OverwriteSaramaConfig, producer)
}

return &PublisherAsync{
config: config,
producer: producer,
logger: logger,
errorsChan: producer.Errors(),
successesChan: producer.Successes(),
}, nil
}

func (c *PublisherConfig) setAsyncDefaults() {
if c.OverwriteSaramaConfig == nil {
c.OverwriteSaramaConfig = DefaultSaramaAsyncPublisherConfig()
}
}

func DefaultSaramaAsyncPublisherConfig() *sarama.Config {
config := sarama.NewConfig()

config.Producer.Retry.Max = 10
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true
config.Version = sarama.V1_0_0_0
config.Metadata.Retry.Backoff = time.Second * 2
config.ClientID = "watermill"

return config
}

// Publish publishes message(s) to Kafka.
//
// Publish is not blocking
// Make sure you are reading from Errors and Successes channels
func (p *PublisherAsync) Publish(topic string, msgs ...*message.Message) error {
if p.closed {
return errors.New("publisher closed")
}

logFields := make(watermill.LogFields, 2)
logFields["topic"] = topic

for _, msg := range msgs {
logFields["message_uuid"] = msg.UUID
p.logger.Trace("Sending message to Kafka", logFields)

kafkaMsg, err := p.config.Marshaler.Marshal(topic, msg)
if err != nil {
return errors.Wrapf(err, "cannot marshal message %s", msg.UUID)
}

p.producer.Input() <- kafkaMsg
}

return nil
}

func (p *PublisherAsync) Close() error {
if p.closed {
return nil
}
p.closed = true

if err := p.producer.Close(); err != nil {
return errors.Wrap(err, "cannot close Kafka producer")
}

return nil
}

func (p *PublisherAsync) Errors() <-chan *sarama.ProducerError {
return p.errorsChan
}

func (p *PublisherAsync) Successes() <-chan *sarama.ProducerMessage {
return p.successesChan
}
200 changes: 200 additions & 0 deletions pkg/kafka/pubsub_async_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,200 @@
package kafka_test

import (
"context"
"fmt"
"testing"
"time"

"github.com/Shopify/sarama"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/ThreeDotsLabs/watermill"
"github.com/ThreeDotsLabs/watermill-kafka/v2/pkg/kafka"
"github.com/ThreeDotsLabs/watermill/message"
"github.com/ThreeDotsLabs/watermill/message/subscriber"
"github.com/ThreeDotsLabs/watermill/pubsub/tests"
)

func newAsyncPubSub(t *testing.T, marshaler kafka.MarshalerUnmarshaler, consumerGroup string) (*kafka.PublisherAsync, *kafka.Subscriber) {
logger := watermill.NewStdLogger(true, true)

var err error
var publisher *kafka.PublisherAsync

retriesLeft := 5
for {
publishConfig := kafka.DefaultSaramaAsyncPublisherConfig()
publishConfig.Producer.Return.Successes = false
publishConfig.Producer.Return.Errors = false
publisher, err = kafka.NewAsyncPublisher(kafka.PublisherConfig{
Brokers: kafkaBrokers(),
Marshaler: marshaler,
OverwriteSaramaConfig: publishConfig,
}, logger)
if err == nil || retriesLeft == 0 {
break
}

retriesLeft--
fmt.Printf("cannot create kafka Publisher: %s, retrying (%d retries left)", err, retriesLeft)
time.Sleep(time.Second * 2)
}
require.NoError(t, err)

saramaConfig := kafka.DefaultSaramaSubscriberConfig()
saramaConfig.Consumer.Offsets.Initial = sarama.OffsetOldest

saramaConfig.Admin.Timeout = time.Second * 30
saramaConfig.Producer.RequiredAcks = sarama.WaitForAll
saramaConfig.ChannelBufferSize = 10240
saramaConfig.Consumer.Group.Heartbeat.Interval = time.Millisecond * 500
saramaConfig.Consumer.Group.Rebalance.Timeout = time.Second * 3

var subscriber *kafka.Subscriber

retriesLeft = 5
for {
subscriber, err = kafka.NewSubscriber(
kafka.SubscriberConfig{
Brokers: kafkaBrokers(),
Unmarshaler: marshaler,
OverwriteSaramaConfig: saramaConfig,
ConsumerGroup: consumerGroup,
InitializeTopicDetails: &sarama.TopicDetail{
NumPartitions: 8,
ReplicationFactor: 1,
},
},
logger,
)
if err == nil || retriesLeft == 0 {
break
}

retriesLeft--
fmt.Printf("cannot create kafka Subscriber: %s, retrying (%d retries left)", err, retriesLeft)
time.Sleep(time.Second * 2)
}

require.NoError(t, err)

return publisher, subscriber
}

func createPubSubWithConsumerGrupAsync(t *testing.T, consumerGroup string) (message.Publisher, message.Subscriber) {
return newAsyncPubSub(t, kafka.DefaultMarshaler{}, consumerGroup)
}

func createPubSubAsync(t *testing.T) (message.Publisher, message.Subscriber) {
return createPubSubWithConsumerGrupAsync(t, "test")
}

func createPartitionedPubSubAsync(t *testing.T) (message.Publisher, message.Subscriber) {
return newAsyncPubSub(t, kafka.NewWithPartitioningMarshaler(generatePartitionKey), "test")
}

func createNoGroupPubSubAsync(t *testing.T) (message.Publisher, message.Subscriber) {
return newPubSub(t, kafka.DefaultMarshaler{}, "")
}

func TestPublishSubscribeAsync(t *testing.T) {
features := tests.Features{
ConsumerGroups: true,
ExactlyOnceDelivery: false,
GuaranteedOrder: false,
Persistent: true,
}

tests.TestPubSub(
t,
features,
createPubSubAsync,
createPubSubWithConsumerGrupAsync,
)
}

func TestPublishSubscribeAsync_ordered(t *testing.T) {
if testing.Short() {
t.Skip("skipping long tests")
}

tests.TestPubSub(
t,
tests.Features{
ConsumerGroups: true,
ExactlyOnceDelivery: false,
GuaranteedOrder: true,
Persistent: true,
},
createPartitionedPubSubAsync,
createPubSubWithConsumerGrupAsync,
)
}

func TestNoGroupSubscriberAsync(t *testing.T) {
if testing.Short() {
t.Skip("skipping long tests")
}

tests.TestPubSub(
t,
tests.Features{
ConsumerGroups: false,
ExactlyOnceDelivery: false,
GuaranteedOrder: false,
Persistent: true,
NewSubscriberReceivesOldMessages: true,
},
createNoGroupPubSubAsync,
nil,
)
}

func TestCtxValuesAsync(t *testing.T) {
pub, sub := newAsyncPubSub(t, kafka.DefaultMarshaler{}, "")
topicName := "topic_" + watermill.NewUUID()

var messagesToPublish []*message.Message

for i := 0; i < 20; i++ {
id := watermill.NewUUID()
messagesToPublish = append(messagesToPublish, message.NewMessage(id, nil))
}
err := pub.Publish(topicName, messagesToPublish...)
require.NoError(t, err, "cannot publish message")

messages, err := sub.Subscribe(context.Background(), topicName)
require.NoError(t, err)

receivedMessages, all := subscriber.BulkReadWithDeduplication(messages, len(messagesToPublish), time.Second*10)
require.True(t, all)

expectedPartitionsOffsets := map[int32]int64{}
for _, msg := range receivedMessages {
partition, ok := kafka.MessagePartitionFromCtx(msg.Context())
assert.True(t, ok)

messagePartitionOffset, ok := kafka.MessagePartitionOffsetFromCtx(msg.Context())
assert.True(t, ok)

kafkaMsgTimestamp, ok := kafka.MessageTimestampFromCtx(msg.Context())
assert.True(t, ok)
assert.NotZero(t, kafkaMsgTimestamp)

if expectedPartitionsOffsets[partition] <= messagePartitionOffset {
// kafka partition offset is offset of the last message + 1
expectedPartitionsOffsets[partition] = messagePartitionOffset + 1
}
}
assert.NotEmpty(t, expectedPartitionsOffsets)

offsets, err := sub.PartitionOffset(topicName)
require.NoError(t, err)
assert.NotEmpty(t, offsets)

assert.EqualValues(t, expectedPartitionsOffsets, offsets)

require.NoError(t, pub.Close())
}