diff --git a/pkg/http/subscriber.go b/pkg/http/subscriber.go index d50813e..8157904 100644 --- a/pkg/http/subscriber.go +++ b/pkg/http/subscriber.go @@ -66,7 +66,7 @@ type Subscriber struct { logger watermill.LoggerAdapter - outputChannels []chan *message.Message + outputChannels map[string][]chan *message.Message outputChannelsLock sync.Locker closed bool @@ -89,7 +89,7 @@ func NewSubscriber(addr string, config SubscriberConfig, logger watermill.Logger config: config, server: s, logger: logger, - outputChannels: make([]chan *message.Message, 0), + outputChannels: make(map[string][]chan *message.Message), outputChannelsLock: &sync.Mutex{}, }, nil } @@ -104,7 +104,8 @@ func (s *Subscriber) Subscribe(ctx context.Context, url string) (<-chan *message messages := make(chan *message.Message) s.outputChannelsLock.Lock() - s.outputChannels = append(s.outputChannels, messages) + s.outputChannels[url] = append(s.outputChannels[url], messages) + channels := s.outputChannels[url] s.outputChannelsLock.Unlock() baseLogFields := watermill.LogFields{"url": url, "provider": ProviderName} @@ -133,20 +134,41 @@ func (s *Subscriber) Subscribe(ctx context.Context, url string) (<-chan *message logFields := baseLogFields.Add(watermill.LogFields{"message_uuid": msg.UUID}) - s.logger.Trace("Sending msg", logFields) - messages <- msg + var acked, nacked, cancelled int - s.logger.Trace("Waiting for ACK", logFields) - select { - case <-msg.Acked(): - s.logger.Trace("Message acknowledged", logFields.Add(watermill.LogFields{"err": err})) - w.WriteHeader(http.StatusOK) - case <-msg.Nacked(): - s.logger.Trace("Message nacked", logFields.Add(watermill.LogFields{"err": err})) - w.WriteHeader(http.StatusInternalServerError) - case <-r.Context().Done(): - s.logger.Info("Request stopped without ACK received", logFields) + for index, channel := range channels { + m := msg.Copy() + + m.SetContext(ctx) + + s.logger.Trace("Sending msg", logFields.Add(watermill.LogFields{"index": index})) + channel <- m + s.logger.Trace("Waiting for ACK", logFields.Add(watermill.LogFields{"index": index})) + + select { + case <-m.Acked(): + s.logger.Trace("Message acknowledged", logFields.Add(watermill.LogFields{"index": index})) + acked++ + case <-m.Nacked(): + s.logger.Trace("Message nacked", logFields.Add(watermill.LogFields{"index": index})) + nacked++ + case <-r.Context().Done(): + s.logger.Info("Request stopped without ACK received", logFields.Add(watermill.LogFields{"index": index})) + cancelled++ + } + } + + s.logger.Trace("Messages send", logFields.Add(watermill.LogFields{ + "total": len(channels), + "nacked": nacked, + "acked": acked, + "cancelled": cancelled, + })) + + if (cancelled != 0) || (nacked != 0) || (acked == 0) { w.WriteHeader(http.StatusInternalServerError) + } else { + w.WriteHeader(http.StatusOK) } }) @@ -187,8 +209,10 @@ func (s *Subscriber) Close() error { return err } - for _, ch := range s.outputChannels { - close(ch) + for _, channels := range s.outputChannels { + for _, ch := range channels { + close(ch) + } } return nil