From 393464e87abf01ff520d50e3cd69b4b021b9d6b2 Mon Sep 17 00:00:00 2001 From: Tymoteusz Blazejczyk Date: Tue, 12 Apr 2022 22:18:44 +0200 Subject: [PATCH 1/3] Fix multi subscribe handling for single topic The go-chi router overrides added handlers for the same route. --- pkg/http/subscriber.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/pkg/http/subscriber.go b/pkg/http/subscriber.go index d50813e..2b1cb5c 100644 --- a/pkg/http/subscriber.go +++ b/pkg/http/subscriber.go @@ -66,6 +66,7 @@ type Subscriber struct { logger watermill.LoggerAdapter + handlers map[string][]http.HandlerFunc outputChannels []chan *message.Message outputChannelsLock sync.Locker @@ -91,6 +92,7 @@ func NewSubscriber(addr string, config SubscriberConfig, logger watermill.Logger logger: logger, outputChannels: make([]chan *message.Message, 0), outputChannelsLock: &sync.Mutex{}, + handlers: make(map[string][]http.HandlerFunc), }, nil } @@ -113,7 +115,7 @@ func (s *Subscriber) Subscribe(ctx context.Context, url string) (<-chan *message url = "/" + url } - s.config.Router.Post(url, func(w http.ResponseWriter, r *http.Request) { + s.handlers[url] = append(s.handlers[url], func(w http.ResponseWriter, r *http.Request) { msg, err := s.config.UnmarshalMessageFunc(url, r) if err != nil { @@ -150,6 +152,12 @@ func (s *Subscriber) Subscribe(ctx context.Context, url string) (<-chan *message } }) + s.config.Router.Post(url, func(w http.ResponseWriter, r *http.Request) { + for _, handler := range s.handlers[url] { + handler(w, r) + } + }) + return messages, nil } From d05a48b3b3271983d68113bae4f05731f9b4e69b Mon Sep 17 00:00:00 2001 From: Tymoteusz Blazejczyk Date: Wed, 13 Apr 2022 10:42:42 +0200 Subject: [PATCH 2/3] Add more proper msg distribution for N subscribers --- pkg/http/subscriber.go | 74 +++++++++++++++++++++++++++--------------- 1 file changed, 48 insertions(+), 26 deletions(-) diff --git a/pkg/http/subscriber.go b/pkg/http/subscriber.go index 2b1cb5c..9ffc155 100644 --- a/pkg/http/subscriber.go +++ b/pkg/http/subscriber.go @@ -66,8 +66,7 @@ type Subscriber struct { logger watermill.LoggerAdapter - handlers map[string][]http.HandlerFunc - outputChannels []chan *message.Message + outputChannels map[string][]chan *message.Message outputChannelsLock sync.Locker closed bool @@ -90,9 +89,8 @@ func NewSubscriber(addr string, config SubscriberConfig, logger watermill.Logger config: config, server: s, logger: logger, - outputChannels: make([]chan *message.Message, 0), + outputChannels: make(map[string][]chan *message.Message, 0), outputChannelsLock: &sync.Mutex{}, - handlers: make(map[string][]http.HandlerFunc), }, nil } @@ -106,7 +104,8 @@ func (s *Subscriber) Subscribe(ctx context.Context, url string) (<-chan *message messages := make(chan *message.Message) s.outputChannelsLock.Lock() - s.outputChannels = append(s.outputChannels, messages) + s.outputChannels[url] = append(s.outputChannels[url], messages) + channels := s.outputChannels[url] s.outputChannelsLock.Unlock() baseLogFields := watermill.LogFields{"url": url, "provider": ProviderName} @@ -115,7 +114,7 @@ func (s *Subscriber) Subscribe(ctx context.Context, url string) (<-chan *message url = "/" + url } - s.handlers[url] = append(s.handlers[url], func(w http.ResponseWriter, r *http.Request) { + s.config.Router.Post(url, func(w http.ResponseWriter, r *http.Request) { msg, err := s.config.UnmarshalMessageFunc(url, r) if err != nil { @@ -135,26 +134,47 @@ func (s *Subscriber) Subscribe(ctx context.Context, url string) (<-chan *message logFields := baseLogFields.Add(watermill.LogFields{"message_uuid": msg.UUID}) - s.logger.Trace("Sending msg", logFields) - messages <- msg - - s.logger.Trace("Waiting for ACK", logFields) - select { - case <-msg.Acked(): - s.logger.Trace("Message acknowledged", logFields.Add(watermill.LogFields{"err": err})) - w.WriteHeader(http.StatusOK) - case <-msg.Nacked(): - s.logger.Trace("Message nacked", logFields.Add(watermill.LogFields{"err": err})) - w.WriteHeader(http.StatusInternalServerError) - case <-r.Context().Done(): - s.logger.Info("Request stopped without ACK received", logFields) - w.WriteHeader(http.StatusInternalServerError) + var acked, nacked, cancelled int + + for index, channel := range channels { + m := msg.Copy() + + m.SetContext(ctx) + + s.logger.Trace("Sending msg", logFields) + channel <- m + s.logger.Trace("Waiting for ACK", logFields) + + select { + case <-m.Acked(): + s.logger.Trace("Message acknowledged", logFields.Add(watermill.LogFields{ + "index": index, + })) + acked++ + case <-m.Nacked(): + s.logger.Trace("Message nacked", logFields.Add(watermill.LogFields{ + "index": index, + })) + nacked++ + case <-r.Context().Done(): + s.logger.Info("Request stopped without ACK received", logFields.Add(watermill.LogFields{ + "index": index, + })) + cancelled++ + } } - }) - s.config.Router.Post(url, func(w http.ResponseWriter, r *http.Request) { - for _, handler := range s.handlers[url] { - handler(w, r) + s.logger.Trace("Messages send", logFields.Add(watermill.LogFields{ + "total": len(channels), + "nacked": nacked, + "acked": acked, + "cancelled": cancelled, + })) + + if (cancelled != 0) || (nacked != 0) || (acked == 0) { + w.WriteHeader(http.StatusInternalServerError) + } else { + w.WriteHeader(http.StatusOK) } }) @@ -195,8 +215,10 @@ func (s *Subscriber) Close() error { return err } - for _, ch := range s.outputChannels { - close(ch) + for _, channels := range s.outputChannels { + for _, ch := range channels { + close(ch) + } } return nil From c2329382ceff8e276b1ce091451b0e834a0c1f9a Mon Sep 17 00:00:00 2001 From: Tymoteusz Blazejczyk Date: Wed, 13 Apr 2022 10:48:06 +0200 Subject: [PATCH 3/3] Add channel index in logs. --- pkg/http/subscriber.go | 18 ++++++------------ 1 file changed, 6 insertions(+), 12 deletions(-) diff --git a/pkg/http/subscriber.go b/pkg/http/subscriber.go index 9ffc155..8157904 100644 --- a/pkg/http/subscriber.go +++ b/pkg/http/subscriber.go @@ -89,7 +89,7 @@ func NewSubscriber(addr string, config SubscriberConfig, logger watermill.Logger config: config, server: s, logger: logger, - outputChannels: make(map[string][]chan *message.Message, 0), + outputChannels: make(map[string][]chan *message.Message), outputChannelsLock: &sync.Mutex{}, }, nil } @@ -141,25 +141,19 @@ func (s *Subscriber) Subscribe(ctx context.Context, url string) (<-chan *message m.SetContext(ctx) - s.logger.Trace("Sending msg", logFields) + s.logger.Trace("Sending msg", logFields.Add(watermill.LogFields{"index": index})) channel <- m - s.logger.Trace("Waiting for ACK", logFields) + s.logger.Trace("Waiting for ACK", logFields.Add(watermill.LogFields{"index": index})) select { case <-m.Acked(): - s.logger.Trace("Message acknowledged", logFields.Add(watermill.LogFields{ - "index": index, - })) + s.logger.Trace("Message acknowledged", logFields.Add(watermill.LogFields{"index": index})) acked++ case <-m.Nacked(): - s.logger.Trace("Message nacked", logFields.Add(watermill.LogFields{ - "index": index, - })) + s.logger.Trace("Message nacked", logFields.Add(watermill.LogFields{"index": index})) nacked++ case <-r.Context().Done(): - s.logger.Info("Request stopped without ACK received", logFields.Add(watermill.LogFields{ - "index": index, - })) + s.logger.Info("Request stopped without ACK received", logFields.Add(watermill.LogFields{"index": index})) cancelled++ } }