Tutorial on Microservices with Kotlin, SpringBoot, Akka and Docker – part 6

Intro

In the last post we begun touching Akka with the akkaserver service. Now, we will add the companion service, the akkaclient. We will simulate some CRUD requests made by the client to the server, and we will use the same UserDTO defined for the persistenceservice.
Before proceeding, I would suggest some readings about the Actor Model in general and Akka more specifically for those who does not know them.
As always you may download the full code from the repository:

 git clone https://github.com/gitgabrio/container-microservices-tutorial-6.git

actorclient-service

Let’s add the module declaration to the parent pom:

...
   </parent>
    <modules>
        <module>servicecommon</module>
        <module>registrationservice</module>
        <module>persistenceservice</module>
        <module>timeconsumingservice</module>
        <module>configurationservice</module>
        <module>actorserverservice</module>
        <module>actorclientservice</module>
        <module>docker</module>
    </modules>
    <properties>
...

This is the actorclient-service pom:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>container-microservice</artifactId>
        <groupId>net.microservices.tutorial</groupId>
        <version>0.1</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>actorclientservice</artifactId>
    <properties>
        <akka.version>2.5.3</akka.version>
        <!-- CONFIGURATIONS -->
        <start-class>net.microservices.tutorial.actorclientservice.ActorClientServer</start-class>
        <finalName>actorclientservice-${project.version}</finalName>
        <service.port>6666</service.port>
        <rs.port>1111</rs.port>
        <rs.ip>localhost</rs.ip>
        <akka.port>12553</akka.port>
    </properties>

    <dependencies>
        <dependency>
            <groupId>net.microservices.tutorial</groupId>
            <artifactId>servicecommon</artifactId>
            <version>0.1</version>
        </dependency>
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-actor_2.11</artifactId>
            <version>${akka.version}</version>
        </dependency>
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-remote_2.11</artifactId>
            <version>${akka.version}</version>
        </dependency>
    </dependencies>
</project>

As you may see, it is almost the same as the akkaserver‘s one.

code

To implement the communication between the two actors, we will define the actions (to simulate a full CRUD implementation) and a couple of DTOs that will be the actual messages exchanged between the services. Since both services will need this classes, we will put them in the servicecommon package:

net/microservices/tutorial/commands/Command.kt

...
enum class Command {
    CREATE,
    READ,
    UPDATE,
    DELETE
}

net/microservices/tutorial/messages/AkkaMessage.kt

...
open class AkkaMessage(val user: UserDTO, val command: Command, val id: Int) : Serializable {

    override fun toString(): String{
        return "AkkaMessage(command=$command, user=$user, id=$id)"
    }
}

net/microservices/tutorial/messages/AkkaResponse.kt

...
open class AkkaResponse(val done: Boolean, val id: Int): Serializable {
    override fun toString(): String{
        return "AkkaResponse(done=$done, id=$id)"
    }
}

Please note a couple of details:

  1. both classes implements Serializable to be transmitted (UserDTO is Serializable, too, via inheritance)
  2. the id field is used to map the the Response to the original Message, since the communication inside Akka is completely asynchronous

Now, we will simulate the situation where the ClientActor is driven by some other code to send CRUD operations to the server (similar to a frontend layer issuing database operations to the backend layer). The “frontend” role will be implemented inside the configuration class (pretty improper, I know):

net/microservices/tutorial/actorclientservice/configurations/ActorClientConfiguration.kt:

...
@Bean
    open fun actorSystem(): ActorSystem {
        val hostName = eurekaClient?.applicationInfoManager?.info?.ipAddr ?: "hostname"
        val defaultApplication: Config = ConfigFactory.defaultApplication()
                .withValue("akka.remote.netty.tcp.hostname", ConfigValueFactory.fromAnyRef(hostName))
                .withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(akkaPort))
        val system = ActorSystem.create("Client", defaultApplication)
        val clientActor: ActorRef = system.actorOf(Props.create(ClientActor::class.java, eurekaClient!!, actorServerServiceName!!), "clientActor")
        val r: Random = Random()
        var counter: Int = 1
        system.scheduler().schedule(Duration.create(1, SECONDS),
                Duration.create(1, SECONDS), Runnable {
            val userDto : UserDTO = createUserDTO(r.nextInt(50))
            val command: Command = Command.values()[r.nextInt(4)]
            clientActor.tell(AkkaMessage(userDto, command, counter), null)
            counter ++
        }, system.dispatcher())
        return system
    }

We create a UserDTO with a random id between 0 and 49, and we also choose randomly the operation to execute:

...
val userDto : UserDTO = createUserDTO(r.nextInt(50))
val command: Command = Command.values()[r.nextInt(4)]
 ...

Then, we tell to the client actor to send a new AkkaMessage with:

  1. the UserDTO just created
  2. the Command to execute
  3. the progressive id of the message
...
clientActor.tell(AkkaMessage(userDto, command, counter), null)
 ...

The hard work is on done by the ClientActor:

net/microservices/tutorial/actorclientservice/actors/ClientActor.kt:

...
open class ClientActor(val eurekaClient: EurekaClient, val actorServerServiceName: String) : AbstractActor() {

    protected var logger = Logger.getLogger(ClientActor::class.java.simpleName)

    init {
        sendIdentifyRequest()
    }
...

    fun sendIdentifyRequest() {
        val actorServerApplication: Application? = eurekaClient.getApplication(actorServerServiceName)
        actorServerApplication?.shuffleAndStoreInstances(true)
        val instances: List<InstanceInfo>? = actorServerApplication?.instances
        var instanceInfo: InstanceInfo? = null
        if (instances != null && instances.size > 0) {
            instanceInfo = instances[0]
        }
        val ipAddr: String? = instanceInfo?.ipAddr
        val servicePort: String? = instanceInfo?.metadata?.get("port")
        if (notNull(ipAddr, servicePort)) {
            val serviceUrl = "$ipAddr:$servicePort"
            val path = "akka.tcp://RemoteWorkerSystem@$serviceUrl/user/serverActor"
            logger.info("Sending Identify Request to " + path)
            context.actorSelection(path).tell(Identify(path), self)
            context.system().scheduler()
                    .scheduleOnce(Duration.create(5, SECONDS), self(),
                            ReceiveTimeout.getInstance(), context.dispatcher(), self())
        } else {
            logger.warning("Cannot found remote path for $actorServerServiceName; retry in 5 seconds...")
            Timer().schedule(object : TimerTask() {
                override fun run() {
                    logger.info("Retry....")
                    sendIdentifyRequest()
                }
            }, 5000)
        }
    }

}

As soon as the class is instantiated we will send an Identify request to the AkkaServer. To do that, we first retrieve the server’s coordinates from the registrationservice using the EurekaCLient instance:

...
val actorServerApplication: Application? = eurekaClient.getApplication(actorServerServiceName)
        actorServerApplication?.shuffleAndStoreInstances(true)
        val instances: List<InstanceInfo>? = actorServerApplication?.instances
        var instanceInfo: InstanceInfo? = null
        if (instances != null && instances.size > 0) {
            instanceInfo = instances[0]
        }
        val ipAddr: String? = instanceInfo?.ipAddr
        val servicePort: String? = instanceInfo?.metadata?.get("port")
...

Then, we use the data retrieved to build the full URI to the server:

...
val serviceUrl = "$ipAddr:$servicePort"
val path = "akka.tcp://RemoteWorkerSystem@$serviceUrl/user/serverActor"
...

With that, we are now able to send the request to the server actor

...
context.actorSelection(path).tell(Identify(path), self)
...

We also schedule a ReceivedTimeout event to be fired after 5 seconds if the server does not reply to our request.

Last, if we did not retrieve the server’s coordinates from the EurekaClient, we schedule another attempt in five seconds

...
Timer().schedule(object : TimerTask() {
                override fun run() {
                    logger.info("Retry....")
                    sendIdentifyRequest()
                }
            }, 5000)
...

I have defined a couple of functions that will be executed on message receiving, depending if the ClientActor is connected with the ServerActor or not

...
internal var inactive: Receive  = receiveBuilder()
            .match(ActorIdentity::class.java) {
                identity ->
                remoteActor = identity.ref
                if (remoteActor == null) {
                    logger.warning("Remote actor not available: " + identity.correlationId())
                } else {
                    context.watch(remoteActor)
                    context.become(active, true)
                }
            }
            .match(AssociatedEvent::class.java) {
                associatedEvent ->
                logger.warning("AssociatedEvent ${associatedEvent.remoteAddress}")
            }
            .match(ReceiveTimeout::class.java) {
                x ->
                logger.warning("ReceiveTimeout : " + x)
                sendIdentifyRequest()
            }
            .build()
...

While the Client is not connected, it will re-try to connect (calling sendIdentifyRequest) on every ReceiveTimeout.
On connection establishment, the client will

  1. receive an ActorIdentity
  2. with the ref (reference to the other actor) populated

  3. begin to watch the remote actor
  4. switch to the active function to manage the incoming messages
...
 internal var active: Receive = receiveBuilder()
            .match(AkkaMessage::class.java) { s ->
                logger.info("Received " + s)
                remoteActor?.tell(s, self)
                pendingMessages.put(s.id, s)
            }
...

When the client receive an AkkaMessage message, it

  1. send the message to the remote actor
  2. add the message to the map of sent messages awaiting for an answer
...
            .match(AkkaResponse::class.java) { s ->
                logger.info("Received " + s)
                pendingMessages.remove(s.id)
                logger.info("We still have ${pendingMessages.size} pending messages")
            }
...

When the client receive an AkkaResponse message, it remove the corresponding AkkaMessage from the map of sent messages

...
            .match(Terminated::class.java) {
                terminated ->
                logger.warning("ActorServer terminated")
                context.unwatch(remoteActor)
                context.become(inactive, true)
                sendIdentifyRequest()
            }
           ...
            .match(DisassociatedEvent::class.java) {
                disassociatedEvent ->
                logger.warning("DisassociatedEvent ${disassociatedEvent.remoteAddress}")
                logger.info("unbecome $self")
                context.unwatch(remoteActor)
                context.become(inactive, true)
                sendIdentifyRequest()
            }
            .build()
...

When the client receive a Terminated or DisassociatedEvent message, it

  1. stop to watch the remote actor
  2. switch to the inactive function to manage the incoming messages
  3. re-try to connect to the ServerActor (calling sendIdentifyRequest)

There is one point that needs some clarification. Inside the sendIdentifyRequest() function we are actually asking to the Eureka registrar the first available instance of an actorserverservice: it could be the same or a different one respect the previous connected one.
I think this is a little bit “naive”, since we are trying to recreate something that Akka itself provides out-of-the-box: Akka Clustering. We will switch to it in the next part of the tutorial, I promise!

With all the Client talking setup, we have only to adjust the ServerActor, that we left in a pretty useless implementation:
net/microservices/tutorial/actorserverservice/actors/ServerActor.kt:

...
 override fun createReceive(): Receive {
        return ReceiveBuilder()
                .match(String::class.java, { s -> s != "OK" }) { s ->
                    logger.info("Received " + s)
                    sender.tell("OK", self)
                }
                .match(AkkaMessage::class.java) {
                    message ->
                    logger.info("Received $message" )
                    val response = executeCommand(message)
                    logger.info("Sending $response" )
                    sender.tell(response, self)
                }
                .build()
    }

    private fun executeCommand(message: AkkaMessage): AkkaResponse {
        var done: Boolean = false
        when (message.command) {
            Command.CREATE -> {
                if (!users.contains(message.user.id)) {
                    users.put(message.user.id!!, message.user)
                    done = true
                }
            }
            Command.READ -> {
                if (users.contains(message.user.id)) {
                    done = true
                }
            }
            Command.UPDATE -> {
                if (users.contains(message.user.id)) {
                    done = true
                }
            }
            Command.DELETE -> {
                if (users.contains(message.user.id)) {
                    users.remove(message.user.id!!)
                    done = true
                }
            }
        }
        return AkkaResponse(done, message.id)
    }
...

When the actor receive an AkkaMessage message, it will try to execute the received command with the included user. The class contains a Map with the users currently created: modify or read a user whose id is not registered (READ/UPDATE/DELETE) will fire a “fail” response; the same on attempt to create a user whose id is already registered. Otherwise, a “success” response will be sent.

image creation

Let’s update the docker configuration.
docker/pom.xml:

...
  <properties>
        <!-- PLUGINS -->
        <docker.maven.plugin.fabric8.version>0.21.0</docker.maven.plugin.fabric8.version>
        <!-- CONFIGURATIONS -->
        <docker.repo>(YOUR-REPO-NAME)</docker.repo>
        <!-- services properties -->
        <registration.service.port>1111</registration.service.port>
        <persistence.service.port>2222</persistence.service.port>
        <timeconsuming.service.port>3333</timeconsuming.service.port>
        <configuration.service.port>4444</configuration.service.port>
        <actorserver.service.port>5555</actorserver.service.port>
        <actorclient.service.port>6666</actorclient.service.port>
        <!-- Images to build and run - default to all-->
        <images>registrationservice, persistenceservice, timeconsumingservice, configurationservice,
            actorserverservice, actorclientservice
        </images>
    </properties>

    <profiles>
        <profile>
            <id>configuration</id>
            <properties>
                <images>registrationservice, persistenceservice, timeconsumingservice, configurationservice
                </images>
            </properties>
        </profile>
        <profile>
            <id>akka</id>
            <properties>
                <images>registrationservice, actorserverservice, actorclientservice
                </images>
            </properties>
        </profile>
    </profiles>

    <dependencies>
        ...
        <dependency>
            <groupId>net.microservices.tutorial</groupId>
            <artifactId>actorclientservice</artifactId>
            <version>0.1</version>
        </dependency>
    </dependencies>
...

You will find the ClientServerService image in the sources.

After installing the actorclientservice artifact and starting docker with the akka profile, we should have the actorclient service registered:

and the image running here
.

Now, we could also see the logs of the running containers with the

mvn docker:logs -P akka

In the logs, you will see the ClientActor sending messages:

and the ServerActor replying them:

Conclusion

Well, you will find the image definition (and all the other missing stuff) in the project:

git clone https://github.com/gitgabrio/container-microservices-tutorial-6.git

In the next part we will add the Akka Cluster implementation so… stay tuned!!!

Any comment and suggestion will be greatly appreciated!!!

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s