Skip to content

Commit

Permalink
Rewrite akka type urls for routing
Browse files Browse the repository at this point in the history
  • Loading branch information
johanandren committed Nov 8, 2024
1 parent 989e90e commit ec9c22c
Show file tree
Hide file tree
Showing 6 changed files with 22 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,10 @@ object AnySupport {
def stripJsonTypeUrlPrefix(typeUrl: String): String =
typeUrl.stripPrefix(AkkaJsonTypeUrlPrefix).stripPrefix(KalixJsonTypeUrlPrefix)

def replaceAkkaJsonPrefix(typeUrl: String): String =
if (typeUrl.startsWith(AkkaJsonTypeUrlPrefix)) KalixJsonTypeUrlPrefix + typeUrl.stripPrefix(AkkaJsonTypeUrlPrefix)
else typeUrl

sealed abstract class Primitive[T: ClassTag] {
val name: String = fieldType.name().toLowerCase(Locale.ROOT)
val fullName: String = KalixPrimitive + name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,12 @@ package kalix.javasdk.impl.action

import akka.NotUsed
import akka.stream.javadsl.Source
import com.google.protobuf.any.{ Any => ScalaPbAny }
import kalix.javasdk.action.{ Action, MessageEnvelope }
import com.google.protobuf.any.{Any => ScalaPbAny}
import kalix.javasdk.action.{Action, MessageEnvelope}
import kalix.javasdk.impl.AnySupport
import kalix.javasdk.impl.AnySupport.ProtobufEmptyTypeUrl
import kalix.javasdk.impl.reflection.Reflect
import kalix.javasdk.impl.{ CommandHandler, InvocationContext }
import kalix.javasdk.impl.{CommandHandler, InvocationContext}

// TODO: abstract away reactor dependency
import reactor.core.publisher.Flux
Expand All @@ -34,7 +35,7 @@ class ReflectiveActionRouter[A <: Action](
commandHandler.requestMessageDescriptor,
message.metadata())

val inputTypeUrl = message.payload().asInstanceOf[ScalaPbAny].typeUrl
val inputTypeUrl = AnySupport.replaceAkkaJsonPrefix(message.payload().asInstanceOf[ScalaPbAny].typeUrl)
val methodInvoker = commandHandler.lookupInvoker(inputTypeUrl)

// lookup ComponentClient
Expand Down Expand Up @@ -79,7 +80,7 @@ class ReflectiveActionRouter[A <: Action](
componentMethod.requestMessageDescriptor,
message.metadata())

val inputTypeUrl = message.payload().asInstanceOf[ScalaPbAny].typeUrl
val inputTypeUrl = AnySupport.replaceAkkaJsonPrefix(message.payload().asInstanceOf[ScalaPbAny].typeUrl)
componentMethod.lookupInvoker(inputTypeUrl) match {
case Some(methodInvoker) =>
val response = methodInvoker.invoke(action, context).asInstanceOf[Flux[Action.Effect[_]]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,12 @@

package kalix.javasdk.impl.eventsourcedentity

import com.google.protobuf.any.{ Any => ScalaPbAny }
import com.google.protobuf.{ Any => JavaPbAny }
import com.google.protobuf.any.{Any => ScalaPbAny}
import com.google.protobuf.{Any => JavaPbAny}
import kalix.javasdk.JsonSupport
import kalix.javasdk.eventsourcedentity.CommandContext
import kalix.javasdk.eventsourcedentity.EventSourcedEntity
import kalix.javasdk.impl.AnySupport
import kalix.javasdk.impl.CommandHandler
import kalix.javasdk.impl.InvocationContext
import kalix.javasdk.impl.JsonMessageCodec
Expand Down Expand Up @@ -70,7 +71,7 @@ class ReflectiveEventSourcedEntityRouter[S, E, ES <: EventSourcedEntity[S, E]](
commandHandler.requestMessageDescriptor,
commandContext.metadata())

val inputTypeUrl = command.asInstanceOf[ScalaPbAny].typeUrl
val inputTypeUrl = AnySupport.replaceAkkaJsonPrefix(command.asInstanceOf[ScalaPbAny].typeUrl)
val methodInvoker = commandHandler
.getInvoker(inputTypeUrl)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@
package kalix.javasdk.impl.valueentity

import java.lang.reflect.ParameterizedType

import com.google.protobuf.any.{ Any => ScalaPbAny }
import com.google.protobuf.any.{Any => ScalaPbAny}
import kalix.javasdk.JsonSupport
import kalix.javasdk.impl.AnySupport
import kalix.javasdk.impl.CommandHandler
import kalix.javasdk.impl.InvocationContext
import kalix.javasdk.valueentity.CommandContext
Expand Down Expand Up @@ -36,7 +36,7 @@ class ReflectiveValueEntityRouter[S, E <: ValueEntity[S]](
commandHandler.requestMessageDescriptor,
commandContext.metadata())

val inputTypeUrl = command.asInstanceOf[ScalaPbAny].typeUrl
val inputTypeUrl = AnySupport.replaceAkkaJsonPrefix(command.asInstanceOf[ScalaPbAny].typeUrl)

commandHandler
.getInvoker(inputTypeUrl)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,10 @@ package kalix.javasdk.impl.view

import java.lang.reflect.ParameterizedType
import java.util.{ Map => JMap }

import scala.jdk.CollectionConverters._

import com.google.protobuf.any.{ Any => ScalaPbAny }
import kalix.javasdk.JsonSupport
import kalix.javasdk.impl.AnySupport
import kalix.javasdk.impl.AnySupport.ProtobufEmptyTypeUrl
import kalix.javasdk.impl.CommandHandler
import kalix.javasdk.impl.ComponentDescriptorFactory
Expand Down Expand Up @@ -51,7 +50,7 @@ class ReflectiveViewRouter[S, V <: View[S]](
val commandHandler = commandHandlerLookup(commandName)

val anyEvent = event.asInstanceOf[ScalaPbAny]
val inputTypeUrl = anyEvent.typeUrl
val inputTypeUrl = AnySupport.replaceAkkaJsonPrefix(anyEvent.typeUrl)
val methodInvoker = commandHandler.lookupInvoker(inputTypeUrl)

methodInvoker match {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@

package kalix.javasdk.impl.workflow

import com.google.protobuf.any.{ Any => ScalaPbAny }
import com.google.protobuf.any.{Any => ScalaPbAny}
import kalix.javasdk.impl.AnySupport
import kalix.javasdk.impl.CommandHandler
import kalix.javasdk.impl.InvocationContext
import kalix.javasdk.workflow.AbstractWorkflow
Expand Down Expand Up @@ -36,7 +37,7 @@ class ReflectiveWorkflowRouter[S, W <: Workflow[S]](
commandHandler.requestMessageDescriptor,
commandContext.metadata())

val inputTypeUrl = command.asInstanceOf[ScalaPbAny].typeUrl
val inputTypeUrl = AnySupport.replaceAkkaJsonPrefix(command.asInstanceOf[ScalaPbAny].typeUrl)

commandHandler
.getInvoker(inputTypeUrl)
Expand Down

0 comments on commit ec9c22c

Please sign in to comment.