Welcome

Welcome all!
This is my technical blog. I’ll use it to share my experience in the development world.
Hope you may found it interesting!!!!

Advertisements

Tutorial on Microservices with Kotlin, SpringBoot, Akka and Docker – part 7

Intro

In the last post we implemented the akkaclient service, so to have a simple Akka message exchange. What was missing is the AkkaCluster implementation, the topic of this last part of the tutorial.
My goal overall goal is to integrate as much as possible the “Akka” world with the “Eureka” one, having only one main “configuration” point, represented by the “RegistrationService”: akka services will have to retrieve cluster informations from it.
As always you may download the full code from the repository:

 git clone https://github.com/gitgabrio/container-microservices-tutorial-7.git

akkacluster-service

Let’s add the module declaration to the parent pom:

...
   </parent>
    <modules>
        <module>servicecommon</module>
        <module>registrationservice</module>
        <module>persistenceservice</module>
        <module>timeconsumingservice</module>
        <module>configurationservice</module>
        <module>akkaclusterservice</module>
        <module>actorserverservice</module>
        <module>actorclientservice</module>
        <module>docker</module>
    </modules>
    <properties>
...

This is the akkacluster-service pom:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 <parent>
  <groupId>net.microservices.tutorial</groupId>
  <artifactId>container-microservice</artifactId>
  <version>0.1</version>
  <relativePath>../</relativePath>
 </parent>
 <modelVersion>4.0.0</modelVersion>

 <artifactId>akkaclusterservice</artifactId>
 <properties>
  <akka.version>2.5.3</akka.version>
  <!-- CONFIGURATIONS -->
  <start-class>net.microservices.tutorial.akkaclusterservice.AkkaClusterServer</start-class>
  <finalName>akkaclusterservice-${project.version}</finalName>
  <service.port>7777</service.port>
  <rs.port>1111</rs.port>
  <rs.ip>localhost</rs.ip>
  <akka.port>12550</akka.port>
  <seed.port1>22552</seed.port1>
  <seed.port2>22553</seed.port2>
 </properties>
 <build>
  <finalName>${finalName}</finalName>
 </build>
 <dependencies>
  <dependency>
   <groupId>net.microservices.tutorial</groupId>
   <artifactId>servicecommon</artifactId>
   <version>0.1</version>
  </dependency>
  <dependency>
   <groupId>com.typesafe.akka</groupId>
   <artifactId>akka-actor_2.11</artifactId>
   <version>${akka.version}</version>
  </dependency>
  <dependency>
   <groupId>com.typesafe.akka</groupId>
   <artifactId>akka-remote_2.11</artifactId>
   <version>${akka.version}</version>
  </dependency>
  <dependency>
   <groupId>com.typesafe.akka</groupId>
   <artifactId>akka-cluster_2.11</artifactId>
   <version>${akka.version}</version>
  </dependency>
  <dependency>
   <groupId>com.typesafe.akka</groupId>
   <artifactId>akka-cluster-metrics_2.11</artifactId>
   <version>${akka.version}</version>
  </dependency>
 </dependencies>
</project>

As you may see, it is almost the same as the other akka modules, except for the akka-cluster specific dependencies. Please note that these dependencies have been added also to the actorserverservice and actorclientservice modules.

application-conf


akka {
 actor {
  provider = "cluster"
  serializers {
   proto = "akka.remote.serialization.ProtobufSerializer"
  }
 }
 remote {
  netty.tcp {
   hostname = localhost
   port = 4324
  }
  retry-gate-closed-for = 10 s
 }

 cluster {
  seed-nodes = [
   "akka.tcp://ClusterSystem@127.0.0.1:2551",
   "akka.tcp://ClusterSystem@127.0.0.1:2552"]
 }
 log-sent-messages = on
 log-received-messages = on

# The length of time to gate an address whose name lookup has failed
# or has explicitly signalled that it will not accept connections
# (remote system is shutting down or the requesting system is quarantined).
# No connection attempts will be made to an address while it remains
# gated. Any messages sent to a gated address will be directed to dead
# letters instead. Name lookups are costly, and the time to recovery
# is typically large, therefore this setting should be a value in the
# order of seconds or minutes.
 gate-invalid-addresses-for = 10 s
}

The application-conf has been modified to use “cluster” as provider. This modification has been done also to the other akka modules. Specific for akkacluster is the cluster configuration

...
cluster {
  seed-nodes = [
   "akka.tcp://ClusterSystem@127.0.0.1:2551",
   "akka.tcp://ClusterSystem@127.0.0.1:2552"]
 }
...

where we define the seed-nodes address. Here you may read the general instructions for Cluster configuration.

With the above in files in place, we could already start our bare-bone cluster. With the following code we will add some functionalities to it.

code

We want to show in the home page some informations about the cluster, namely:
1) Registered Nodes status
2) Cluster Metrics (CPU and memory usage)

The controller will take this informations from the autowired service class and will inject them in the template:
net/microservices/tutorial/akkaclusterservice/controllers/HomeController.kt

...
@Controller
class HomeController {

    @Autowired
    var homeService: HomeService? = null

    @RequestMapping("/")
    fun home(model: Model): String {
        val heapMemory = homeService?.heapMemory ?: "UNKNOWN"
        val processors = homeService?.processors ?: "UNKNOWN"
        val loadAverage = homeService?.loadAverage ?: "UNKNOWN"
        val nodes = homeService?.nodeStatusMap ?: mutableMapOf()
        model.addAttribute("heapMemory", heapMemory)
        model.addAttribute("processors", processors)
        model.addAttribute("loadAverage", loadAverage)
        model.addAttribute("nodes", nodes)
        return "index"
    }

}

The “Service” class will be used to store this informations

net/microservices/tutorial/akkaclusterservice/services/HomeService.kt

...
@Service
open class HomeService {

    enum class NODE_STATUS {
        UP,
        UNREACHABLE,
        REMOVED
    }
    var heapMemory: Double = 0.00
    var processors: Int = 0
    var loadAverage: Double = 0.00
    var nodeStatusMap: MutableMap<Address, NODE_STATUS> = mutableMapOf()
}

For the Nodes status we have defined an enum and a Map with the Node‘s address and status. Using the Address as key allow us to have only one entry for Nodes restarted on the same address.
The other variables are used for the metrics.

Now, we have to populate all this values.

First of all we are going to monitor when Nodes are being added/removed. For this, we create an Actor that registers itself to cluster events and reacts to specific messages:

net/microservices/tutorial/akkaclusterservice/actors/ClusterListenerActor.kt

class ClusterListenerActor(val homeService: HomeService) : AbstractActor() {
...

 //subscribe to cluster changes
 override fun preStart() {
  cluster.subscribe(self(), ClusterEvent.initialStateAsEvents(),
   MemberEvent::class.java, UnreachableMember::class.java)
 }

 //re-subscribe when restart
 override fun postStop() {
  cluster.unsubscribe(self())
 }

 override fun createReceive(): AbstractActor.Receive {
        return receiveBuilder()
                .match(MemberUp::class.java) { mUp ->
                    log.info("Member is Up: {}", mUp.member())
                    homeService.nodeStatusMap.put(mUp.member().uniqueAddress().address(), HomeService.NODE_STATUS.UP)
                }
                .match(UnreachableMember::class.java) { mUnreachable ->
                    log.info("Member detected as unreachable: {}", mUnreachable.member())
                    homeService.nodeStatusMap.put(mUnreachable.member().uniqueAddress().address(), HomeService.NODE_STATUS.UNREACHABLE)
                }
                .match(MemberRemoved::class.java) { mRemoved ->
                    log.info("Member is Removed: {}", mRemoved.member())
                    homeService.nodeStatusMap.put(mRemoved.member().uniqueAddress().address(), HomeService.NODE_STATUS.REMOVED)
                }
                .match(MemberEvent::class.java) { message ->
                    log.info("Member is {}: {}", message.member().status(), message.member())
            // ignore
                }
                .build()
    }
}

Whenever a new Node registers itself to the cluster or change its status, the actor updates HomeService’s nodeStatusMap accordingly.

Next, we want to show some metrics for the cluster. For this, we implement another actor that reacts to ClusterMetricsChanged messages:

net/microservices/tutorial/akkaclusterservice/actors/MetricsListenerActor.kt:

 ...
class MetricsListenerActor(val homeService: HomeService) : AbstractActor() {
 ...

  // Subscribe unto ClusterMetricsEvent events.
  override fun preStart() {
   extension.subscribe(self())
  }

  // Unsubscribe from ClusterMetricsEvent events.
  override fun postStop() {
   extension.unsubscribe(self())
  }

  override fun createReceive(): AbstractActor.Receive {
   return receiveBuilder()
    .match(ClusterMetricsChanged::class.java) { clusterMetrics ->
      for (nodeMetrics in clusterMetrics.nodeMetrics) {
       if (nodeMetrics.address() == cluster.selfAddress()) {
        logHeap(nodeMetrics)
        logCpu(nodeMetrics)
       }
      }
    }
    .match(CurrentClusterState::class.java) { message ->
      // Ignore.
    }
    .build()
  }

  internal fun logHeap(nodeMetrics: NodeMetrics) {
   val heap = StandardMetrics.extractHeapMemory(nodeMetrics)
    if (heap != null) {
     val heapMemory = heap.used().toDouble() / 1024.0 / 1024.0
     homeService.heapMemory = heapMemory
   }
  }

  internal fun logCpu(nodeMetrics: NodeMetrics) {
    val cpu = StandardMetrics.extractCpu(nodeMetrics)
     if (cpu != null && cpu.systemLoadAverage().isDefined) {
      homeService.processors = cpu.processors()
      homeService.loadAverage = cpu.systemLoadAverage().get() as Double
  }
 }

This actor will modify the others HomeService’s variables.

All this functionalities are orchestrated inside the configuration:
net/microservices/tutorial/akkaclusterservice/configurations/AkkaClusterConfiguration.kt:

@Configuration
@EnableDiscoveryClient
@ComponentScan("net.microservices.tutorial.akkaclusterservice")
open class AkkaClusterConfiguration {

    private val SYSTEM_NAME = "ClusterSystem"

    @Value("\${eureka.instance.metadata-map.port}")
    var akkaPort: Int = 0

    @Value("\${eureka.instance.metadata-map.seed_port1}")
    var seedPort1: Int = 0

    @Value("\${eureka.instance.metadata-map.seed_port2}")
    var seedPort2: Int = 0

    @Autowired
    private val eurekaClient: EurekaClient? = null


    @Bean
    open fun akkaClusterSystem(homeService: HomeService): ActorSystem {
        startClusterListener(seedPort1, "clusterListenerActor1", homeService)
        startClusterListener(seedPort2, "clusterListenerActor2", homeService)
        val defaultApplication: Config = getConfig(akkaPort)
        val toReturn = ActorSystem.create(SYSTEM_NAME, defaultApplication)
        return toReturn
    }

    @Bean
    open fun metricsListenerActor(akkaClusterSystem: ActorSystem, homeService: HomeService): ActorRef {
        val toReturn = akkaClusterSystem.actorOf(Props.create(MetricsListenerActor::class.java, homeService), "metricsListener")
        return toReturn
    }

    @Bean
    open fun homeController(): HomeController {
        return HomeController()
    }

    private fun startClusterListener(seedPort: Int, actorName: String, homeService: HomeService) {
        val defaultApplication: Config = getConfig(seedPort)
        val system = ActorSystem.create(SYSTEM_NAME, defaultApplication)
        system.actorOf(Props.create(ClusterListenerActor::class.java, homeService), actorName)
    }

    private fun getConfig(tcpPort: Int): Config {
        val hostName = eurekaClient?.applicationInfoManager?.info?.ipAddr ?: "hostname"
        val seedNodes = listOf("akka.tcp://$SYSTEM_NAME@$hostName:$seedPort1", "akka.tcp://$SYSTEM_NAME@$hostName:$seedPort2")
        val toReturn: Config = ConfigFactory.defaultApplication()
                .withValue("akka.remote.netty.tcp.hostname", ConfigValueFactory.fromAnyRef(hostName))
                .withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(tcpPort))
                .withValue("akka.cluster.seed-nodes", ConfigValueFactory.fromIterable(seedNodes))
        return toReturn
    }

}

What’s interesting here is the Cluster startup.
First we instantiate two ClusterListenerActors, one for each seed port

...

    @Bean
    open fun akkaClusterSystem(homeService: HomeService): ActorSystem {
        startClusterListener(seedPort1, "clusterListenerActor1", homeService)
        startClusterListener(seedPort2, "clusterListenerActor2", homeService)
      ...
    }
...

    private fun startClusterListener(seedPort: Int, actorName: String, homeService: HomeService) {
        val defaultApplication: Config = getConfig(seedPort)
        val system = ActorSystem.create(SYSTEM_NAME, defaultApplication)
        system.actorOf(Props.create(ClusterListenerActor::class.java, homeService), actorName)
    }
...

and then we start the cluster with the two seed nodes:

...
    @Bean
    open fun akkaClusterSystem(homeService: HomeService): ActorSystem {
       ...
        val defaultApplication: Config = getConfig(akkaPort)
        val toReturn = ActorSystem.create(SYSTEM_NAME, defaultApplication)
        return toReturn
    }

   ...

    private fun getConfig(tcpPort: Int): Config {
        val hostName = eurekaClient?.applicationInfoManager?.info?.ipAddr ?: "hostname"
        val seedNodes = listOf("akka.tcp://$SYSTEM_NAME@$hostName:$seedPort1", "akka.tcp://$SYSTEM_NAME@$hostName:$seedPort2")
        val toReturn: Config = ConfigFactory.defaultApplication()
                .withValue("akka.remote.netty.tcp.hostname", ConfigValueFactory.fromAnyRef(hostName))
                .withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(tcpPort))
                .withValue("akka.cluster.seed-nodes", ConfigValueFactory.fromIterable(seedNodes))
        return toReturn
    }

Note that we are retrieving the hostName from the eurekaClient, so that other Akka services will be able to retrieve it through the Registration.

To see how, this is how the ActorServerConfiguration has been modified:
net/microservices/tutorial/actorserverservice/configurations/ActorServerConfiguration.kt:

@Configuration
@EnableDiscoveryClient
@ComponentScan("net.microservices.tutorial.actorserverservice")
open class ActorServerConfiguration {

    private val SYSTEM_NAME = "ClusterSystem"

    private var logger = Logger.getLogger(ActorServerConfiguration::class.java.simpleName)

    @Value("\${eureka.instance.metadata-map.port}")
    var akkaPort: Int = 0

    @Value("\${akkaclusterservicename.name}")
    var akkaClusterServiceName: String? = null

    @Autowired
    private val eurekaClient: EurekaClient? = null

    @Bean
    open fun actorSystem(): ActorSystem? {
        val instanceInfo = getRemoteInstanceInfo()
        if (instanceInfo != null) {
            logger.info("instanceInfo ${instanceInfo.hostName} ")
            return getSystem(instanceInfo)
        } else {
            logger.severe("instanceInfo NOT FOUND ")
            return null
        }
    }

    private fun getSystem(instanceInfo: InstanceInfo): ActorSystem {
        val defaultConfiguration = getDefaultConfiguration(instanceInfo)
        val system = ActorSystem.create(SYSTEM_NAME, defaultConfiguration)
        system.actorOf(Props.create(ServerActor::class.java), "serverActor")
        return system
    }

    private fun getRemoteInstanceInfo(): InstanceInfo? {
        logger.info("akkaClusterServiceName $akkaClusterServiceName")
        val akkaClusterService: Application? = eurekaClient?.getApplication(akkaClusterServiceName)
        logger.info("akkaClusterService ? " + akkaClusterService!!.name)
        akkaClusterService?.shuffleAndStoreInstances(true)
        val instances: List<InstanceInfo>? = akkaClusterService?.instances
        var toReturn: InstanceInfo? = null
        if (instances != null && instances.size > 0) {
            toReturn = instances[0]
        }
        return toReturn
    }

    private fun getDefaultConfiguration(instanceInfo: InstanceInfo): Config {
        val hostName = eurekaClient?.applicationInfoManager?.info?.ipAddr ?: "hostname"
        logger.info("hostName $hostName ")
        val clusterServiceHost = instanceInfo.ipAddr
        logger.info("clusterServiceHost $clusterServiceHost")
        val seedNodes = instanceInfo.metadata.filter { kv -> kv.key.startsWith("seed") }.map { kv -> "akka.tcp://$SYSTEM_NAME@$clusterServiceHost:${kv.value}" }
        seedNodes.forEach {
            seedNode -> logger.info("seedNode $seedNode")
        }
        var toReturn: Config = ConfigFactory.defaultApplication()
                .withValue("akka.remote.netty.tcp.hostname", ConfigValueFactory.fromAnyRef(hostName))
                .withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(akkaPort))
        if (!seedNodes.isEmpty()) {
            toReturn = toReturn
                    .withValue("akka.cluster.seed-nodes", ConfigValueFactory.fromIterable(seedNodes))
        }
        return toReturn
    }
}

First, we find the InstanceInfo of the ClusterService:

...

    @Bean
    open fun actorSystem(): ActorSystem? {
        val instanceInfo = getRemoteInstanceInfo()
        ...
    }

    ...

    private fun getRemoteInstanceInfo(): InstanceInfo? {
        logger.info("akkaClusterServiceName $akkaClusterServiceName")
        val akkaClusterService: Application? = eurekaClient?.getApplication(akkaClusterServiceName)
        logger.info("akkaClusterService ? " + akkaClusterService!!.name)
        akkaClusterService?.shuffleAndStoreInstances(true)
        val instances: List<InstanceInfo>? = akkaClusterService?.instances
        var toReturn: InstanceInfo? = null
        if (instances != null && instances.size > 0) {
            toReturn = instances[0]
        }
        return toReturn
    }
...

With this we can then retrieve the address of the remote cluster:

...

    @Bean
    open fun actorSystem(): ActorSystem? {
       ...
            logger.info("instanceInfo ${instanceInfo.hostName} ")
            return getSystem(instanceInfo)
        ...
    }

    private fun getSystem(instanceInfo: InstanceInfo): ActorSystem {
        val defaultConfiguration = getDefaultConfiguration(instanceInfo)
        val system = ActorSystem.create(SYSTEM_NAME, defaultConfiguration)
        system.actorOf(Props.create(ServerActor::class.java), "serverActor")
        return system
    }

   ...

    private fun getDefaultConfiguration(instanceInfo: InstanceInfo): Config {
        val hostName = eurekaClient?.applicationInfoManager?.info?.ipAddr ?: "hostname"
        logger.info("hostName $hostName ")
        val clusterServiceHost = instanceInfo.ipAddr
        logger.info("clusterServiceHost $clusterServiceHost")
        val seedNodes = instanceInfo.metadata.filter { kv -> kv.key.startsWith("seed") }.map { kv -> "akka.tcp://$SYSTEM_NAME@$clusterServiceHost:${kv.value}" }
        seedNodes.forEach {
            seedNode -> logger.info("seedNode $seedNode")
        }
        var toReturn: Config = ConfigFactory.defaultApplication()
                .withValue("akka.remote.netty.tcp.hostname", ConfigValueFactory.fromAnyRef(hostName))
                .withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(akkaPort))
        if (!seedNodes.isEmpty()) {
            toReturn = toReturn
                    .withValue("akka.cluster.seed-nodes", ConfigValueFactory.fromIterable(seedNodes))
        }
        return toReturn
    }
}

The parameter instanceInfo passed to getDefaultConfiguration is actually the InstanceInfo of the ClusterService, from which we retrieve the ipAddress and the seedNodes ports.

When the ServerActor starts, it registers itself to the cluster, and when stops, it unregisters:
net/microservices/tutorial/actorserverservice/actors/ServerActor.kt:

...
//subscribe to MemberUp events
    override fun preStart() {
        cluster.subscribe(self(), ClusterEvent.MemberUp::class.java)
    }

    //re-subscribe when restart
    override fun postStop() {
        cluster.unsubscribe(self())
    }
...

Moreover, it listen for ClusterEvent, so that whenever an Akka Member with the Client role join the cluster, the ServerActor send to it a ServerActorRegistration message:

...
 override fun createReceive(): Receive {
        return ReceiveBuilder()
                ...
                .match(ClusterEvent.CurrentClusterState::class.java) {
                    state ->
                    state.members
                            .filter {
                                member ->
                                member.status().equals(MemberStatus.up())
                            }
                            .forEach {
                                member ->
                                register(member)
                            }
                }
                .match(ClusterEvent.MemberUp::class.java) {
                    mUp -> register(mUp.member())
                }
                .build()
    }

    ...

    private fun register(member: Member) {
        if (member.hasRole("Client")) {
            context.actorSelection("${member.address()}/user/clientActor")
                    .tell(ServerActorRegistration(), self())
        }
    }
...

In this way, the ClientActor will have a reference of the ServerActor to talk with.

The ActorClientService configuration is almost the same as of the ActoreServerService‘s one.
The ClientActor is simpler then before, though, since it does not have to retrieve the information about the ServerActor.
When it is in inactive state, it just listen and wait for an ActorServer to appear:
net/microservices/tutorial/actorclientservice/actors/ClientActor.kt:

...
  internal var inactive: Receive = receiveBuilder()
            .match(ServerActorRegistration::class.java) {
                remoteActor = sender
                context.watch(remoteActor)
                context.become(active, true)
...

When an ActorServer joins the cluster, the client switch to the active state, and begin exchanging messages with it:

 internal var active: Receive = receiveBuilder()
            .match(AkkaMessage::class.java) { s ->
                logger.info("Received " + s)
                remoteActor?.tell(s, self)
                pendingMessages.put(s.id, s)
            }
            .match(AkkaResponse::class.java) { s ->
                logger.info("Received " + s)
                pendingMessages.remove(s.id)
                logger.info("We still have ${pendingMessages.size} pending messages")
            }
            ...

image creation

Let’s update the docker configuration.
docker/pom.xml:

...
   <properties>
        <!-- PLUGINS -->
        <docker.maven.plugin.fabric8.version>0.21.0</docker.maven.plugin.fabric8.version>
        <!-- CONFIGURATIONS -->
        <docker.repo>(YOUR-REPO-NAME)</docker.repo>
        <!-- services properties -->
        <registration.service.port>1111</registration.service.port>
        <persistence.service.port>2222</persistence.service.port>
        <timeconsuming.service.port>3333</timeconsuming.service.port>
        <configuration.service.port>4444</configuration.service.port>
        <actorserver.service.port>5555</actorserver.service.port>
        <actorclient.service.port>6666</actorclient.service.port>
        <akkacluster.service.port>7777</akkacluster.service.port>
        <!-- Images to build and run - default to all-->
        <images>registrationservice, persistenceservice, timeconsumingservice, configurationservice,
            actorserverservice, actorclientservice, akkaclusterservice
        </images>
    </properties>

    <profiles>
        <profile>
            <id>configuration</id>
            <properties>
                <images>registrationservice, persistenceservice, timeconsumingservice, configurationservice
                </images>
            </properties>
        </profile>
        <profile>
            <id>akka</id>
            <properties>
                <images>registrationservice, akkaclusterservice, actorclientservice, actorserverservice
                </images>
            </properties>
        </profile>
    </profiles>

    <dependencies>
        ...
        <dependency>
            <groupId>net.microservices.tutorial</groupId>
            <artifactId>akkaclusterservice</artifactId>
            <version>0.1</version>
        </dependency>
    </dependencies>
...

You will find the AkkaClusterService image in the sources.

After installing the akkaclusterservice artifact and starting docker with the akka profile,
we should have all the akka services registered:

and the akkacluster service running here
.

Last, in the ActorServerService service a new rest url has been added to show the currently registered users:

You may find it here

Conclusion

You will find the image definition (and all the other missing stuff) in the project:

git clone https://github.com/gitgabrio/container-microservices-tutorial-7.git

Final Thoughts

Well, it has been an interesting tutorial, at least for me.
I hope you liked reading it as much as I liked writing it.
I would like to thanks those friends that helped me: I owe you a beer!!!
And, as always…

any comment and suggestion will be greatly appreciated!!!

Tutorial on Microservices with Kotlin, SpringBoot, Akka and Docker – part 6

Intro

In the last post we begun touching Akka with the akkaserver service. Now, we will add the companion service, the akkaclient. We will simulate some CRUD requests made by the client to the server, and we will use the same UserDTO defined for the persistenceservice.
Before proceeding, I would suggest some readings about the Actor Model in general and Akka more specifically for those who does not know them.
As always you may download the full code from the repository:

 git clone https://github.com/gitgabrio/container-microservices-tutorial-6.git

actorclient-service

Let’s add the module declaration to the parent pom:

...
   </parent>
    <modules>
        <module>servicecommon</module>
        <module>registrationservice</module>
        <module>persistenceservice</module>
        <module>timeconsumingservice</module>
        <module>configurationservice</module>
        <module>actorserverservice</module>
        <module>actorclientservice</module>
        <module>docker</module>
    </modules>
    <properties>
...

This is the actorclient-service pom:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>container-microservice</artifactId>
        <groupId>net.microservices.tutorial</groupId>
        <version>0.1</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>actorclientservice</artifactId>
    <properties>
        <akka.version>2.5.3</akka.version>
        <!-- CONFIGURATIONS -->
        <start-class>net.microservices.tutorial.actorclientservice.ActorClientServer</start-class>
        <finalName>actorclientservice-${project.version}</finalName>
        <service.port>6666</service.port>
        <rs.port>1111</rs.port>
        <rs.ip>localhost</rs.ip>
        <akka.port>12553</akka.port>
    </properties>

    <dependencies>
        <dependency>
            <groupId>net.microservices.tutorial</groupId>
            <artifactId>servicecommon</artifactId>
            <version>0.1</version>
        </dependency>
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-actor_2.11</artifactId>
            <version>${akka.version}</version>
        </dependency>
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-remote_2.11</artifactId>
            <version>${akka.version}</version>
        </dependency>
    </dependencies>
</project>

As you may see, it is almost the same as the akkaserver‘s one.

code

To implement the communication between the two actors, we will define the actions (to simulate a full CRUD implementation) and a couple of DTOs that will be the actual messages exchanged between the services. Since both services will need this classes, we will put them in the servicecommon package:

net/microservices/tutorial/commands/Command.kt

...
enum class Command {
    CREATE,
    READ,
    UPDATE,
    DELETE
}

net/microservices/tutorial/messages/AkkaMessage.kt

...
open class AkkaMessage(val user: UserDTO, val command: Command, val id: Int) : Serializable {

    override fun toString(): String{
        return "AkkaMessage(command=$command, user=$user, id=$id)"
    }
}

net/microservices/tutorial/messages/AkkaResponse.kt

...
open class AkkaResponse(val done: Boolean, val id: Int): Serializable {
    override fun toString(): String{
        return "AkkaResponse(done=$done, id=$id)"
    }
}

Please note a couple of details:

  1. both classes implements Serializable to be transmitted (UserDTO is Serializable, too, via inheritance)
  2. the id field is used to map the the Response to the original Message, since the communication inside Akka is completely asynchronous

Now, we will simulate the situation where the ClientActor is driven by some other code to send CRUD operations to the server (similar to a frontend layer issuing database operations to the backend layer). The “frontend” role will be implemented inside the configuration class (pretty improper, I know):

net/microservices/tutorial/actorclientservice/configurations/ActorClientConfiguration.kt:

...
@Bean
    open fun actorSystem(): ActorSystem {
        val hostName = eurekaClient?.applicationInfoManager?.info?.ipAddr ?: "hostname"
        val defaultApplication: Config = ConfigFactory.defaultApplication()
                .withValue("akka.remote.netty.tcp.hostname", ConfigValueFactory.fromAnyRef(hostName))
                .withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(akkaPort))
        val system = ActorSystem.create("Client", defaultApplication)
        val clientActor: ActorRef = system.actorOf(Props.create(ClientActor::class.java, eurekaClient!!, actorServerServiceName!!), "clientActor")
        val r: Random = Random()
        var counter: Int = 1
        system.scheduler().schedule(Duration.create(1, SECONDS),
                Duration.create(1, SECONDS), Runnable {
            val userDto : UserDTO = createUserDTO(r.nextInt(50))
            val command: Command = Command.values()[r.nextInt(4)]
            clientActor.tell(AkkaMessage(userDto, command, counter), null)
            counter ++
        }, system.dispatcher())
        return system
    }

We create a UserDTO with a random id between 0 and 49, and we also choose randomly the operation to execute:

...
val userDto : UserDTO = createUserDTO(r.nextInt(50))
val command: Command = Command.values()[r.nextInt(4)]
 ...

Then, we tell to the client actor to send a new AkkaMessage with:

  1. the UserDTO just created
  2. the Command to execute
  3. the progressive id of the message
...
clientActor.tell(AkkaMessage(userDto, command, counter), null)
 ...

The hard work is on done by the ClientActor:

net/microservices/tutorial/actorclientservice/actors/ClientActor.kt:

...
open class ClientActor(val eurekaClient: EurekaClient, val actorServerServiceName: String) : AbstractActor() {

    protected var logger = Logger.getLogger(ClientActor::class.java.simpleName)

    init {
        sendIdentifyRequest()
    }
...

    fun sendIdentifyRequest() {
        val actorServerApplication: Application? = eurekaClient.getApplication(actorServerServiceName)
        actorServerApplication?.shuffleAndStoreInstances(true)
        val instances: List<InstanceInfo>? = actorServerApplication?.instances
        var instanceInfo: InstanceInfo? = null
        if (instances != null && instances.size > 0) {
            instanceInfo = instances[0]
        }
        val ipAddr: String? = instanceInfo?.ipAddr
        val servicePort: String? = instanceInfo?.metadata?.get("port")
        if (notNull(ipAddr, servicePort)) {
            val serviceUrl = "$ipAddr:$servicePort"
            val path = "akka.tcp://RemoteWorkerSystem@$serviceUrl/user/serverActor"
            logger.info("Sending Identify Request to " + path)
            context.actorSelection(path).tell(Identify(path), self)
            context.system().scheduler()
                    .scheduleOnce(Duration.create(5, SECONDS), self(),
                            ReceiveTimeout.getInstance(), context.dispatcher(), self())
        } else {
            logger.warning("Cannot found remote path for $actorServerServiceName; retry in 5 seconds...")
            Timer().schedule(object : TimerTask() {
                override fun run() {
                    logger.info("Retry....")
                    sendIdentifyRequest()
                }
            }, 5000)
        }
    }

}

As soon as the class is instantiated we will send an Identify request to the AkkaServer. To do that, we first retrieve the server’s coordinates from the registrationservice using the EurekaCLient instance:

...
val actorServerApplication: Application? = eurekaClient.getApplication(actorServerServiceName)
        actorServerApplication?.shuffleAndStoreInstances(true)
        val instances: List<InstanceInfo>? = actorServerApplication?.instances
        var instanceInfo: InstanceInfo? = null
        if (instances != null && instances.size > 0) {
            instanceInfo = instances[0]
        }
        val ipAddr: String? = instanceInfo?.ipAddr
        val servicePort: String? = instanceInfo?.metadata?.get("port")
...

Then, we use the data retrieved to build the full URI to the server:

...
val serviceUrl = "$ipAddr:$servicePort"
val path = "akka.tcp://RemoteWorkerSystem@$serviceUrl/user/serverActor"
...

With that, we are now able to send the request to the server actor

...
context.actorSelection(path).tell(Identify(path), self)
...

We also schedule a ReceivedTimeout event to be fired after 5 seconds if the server does not reply to our request.

Last, if we did not retrieve the server’s coordinates from the EurekaClient, we schedule another attempt in five seconds

...
Timer().schedule(object : TimerTask() {
                override fun run() {
                    logger.info("Retry....")
                    sendIdentifyRequest()
                }
            }, 5000)
...

I have defined a couple of functions that will be executed on message receiving, depending if the ClientActor is connected with the ServerActor or not

...
internal var inactive: Receive  = receiveBuilder()
            .match(ActorIdentity::class.java) {
                identity ->
                remoteActor = identity.ref
                if (remoteActor == null) {
                    logger.warning("Remote actor not available: " + identity.correlationId())
                } else {
                    context.watch(remoteActor)
                    context.become(active, true)
                }
            }
            .match(AssociatedEvent::class.java) {
                associatedEvent ->
                logger.warning("AssociatedEvent ${associatedEvent.remoteAddress}")
            }
            .match(ReceiveTimeout::class.java) {
                x ->
                logger.warning("ReceiveTimeout : " + x)
                sendIdentifyRequest()
            }
            .build()
...

While the Client is not connected, it will re-try to connect (calling sendIdentifyRequest) on every ReceiveTimeout.
On connection establishment, the client will

  1. receive an ActorIdentity
  2. with the ref (reference to the other actor) populated

  3. begin to watch the remote actor
  4. switch to the active function to manage the incoming messages
...
 internal var active: Receive = receiveBuilder()
            .match(AkkaMessage::class.java) { s ->
                logger.info("Received " + s)
                remoteActor?.tell(s, self)
                pendingMessages.put(s.id, s)
            }
...

When the client receive an AkkaMessage message, it

  1. send the message to the remote actor
  2. add the message to the map of sent messages awaiting for an answer
...
            .match(AkkaResponse::class.java) { s ->
                logger.info("Received " + s)
                pendingMessages.remove(s.id)
                logger.info("We still have ${pendingMessages.size} pending messages")
            }
...

When the client receive an AkkaResponse message, it remove the corresponding AkkaMessage from the map of sent messages

...
            .match(Terminated::class.java) {
                terminated ->
                logger.warning("ActorServer terminated")
                context.unwatch(remoteActor)
                context.become(inactive, true)
                sendIdentifyRequest()
            }
           ...
            .match(DisassociatedEvent::class.java) {
                disassociatedEvent ->
                logger.warning("DisassociatedEvent ${disassociatedEvent.remoteAddress}")
                logger.info("unbecome $self")
                context.unwatch(remoteActor)
                context.become(inactive, true)
                sendIdentifyRequest()
            }
            .build()
...

When the client receive a Terminated or DisassociatedEvent message, it

  1. stop to watch the remote actor
  2. switch to the inactive function to manage the incoming messages
  3. re-try to connect to the ServerActor (calling sendIdentifyRequest)

There is one point that needs some clarification. Inside the sendIdentifyRequest() function we are actually asking to the Eureka registrar the first available instance of an actorserverservice: it could be the same or a different one respect the previous connected one.
I think this is a little bit “naive”, since we are trying to recreate something that Akka itself provides out-of-the-box: Akka Clustering. We will switch to it in the next part of the tutorial, I promise!

With all the Client talking setup, we have only to adjust the ServerActor, that we left in a pretty useless implementation:
net/microservices/tutorial/actorserverservice/actors/ServerActor.kt:

...
 override fun createReceive(): Receive {
        return ReceiveBuilder()
                .match(String::class.java, { s -> s != "OK" }) { s ->
                    logger.info("Received " + s)
                    sender.tell("OK", self)
                }
                .match(AkkaMessage::class.java) {
                    message ->
                    logger.info("Received $message" )
                    val response = executeCommand(message)
                    logger.info("Sending $response" )
                    sender.tell(response, self)
                }
                .build()
    }

    private fun executeCommand(message: AkkaMessage): AkkaResponse {
        var done: Boolean = false
        when (message.command) {
            Command.CREATE -> {
                if (!users.contains(message.user.id)) {
                    users.put(message.user.id!!, message.user)
                    done = true
                }
            }
            Command.READ -> {
                if (users.contains(message.user.id)) {
                    done = true
                }
            }
            Command.UPDATE -> {
                if (users.contains(message.user.id)) {
                    done = true
                }
            }
            Command.DELETE -> {
                if (users.contains(message.user.id)) {
                    users.remove(message.user.id!!)
                    done = true
                }
            }
        }
        return AkkaResponse(done, message.id)
    }
...

When the actor receive an AkkaMessage message, it will try to execute the received command with the included user. The class contains a Map with the users currently created: modify or read a user whose id is not registered (READ/UPDATE/DELETE) will fire a “fail” response; the same on attempt to create a user whose id is already registered. Otherwise, a “success” response will be sent.

image creation

Let’s update the docker configuration.
docker/pom.xml:

...
  <properties>
        <!-- PLUGINS -->
        <docker.maven.plugin.fabric8.version>0.21.0</docker.maven.plugin.fabric8.version>
        <!-- CONFIGURATIONS -->
        <docker.repo>(YOUR-REPO-NAME)</docker.repo>
        <!-- services properties -->
        <registration.service.port>1111</registration.service.port>
        <persistence.service.port>2222</persistence.service.port>
        <timeconsuming.service.port>3333</timeconsuming.service.port>
        <configuration.service.port>4444</configuration.service.port>
        <actorserver.service.port>5555</actorserver.service.port>
        <actorclient.service.port>6666</actorclient.service.port>
        <!-- Images to build and run - default to all-->
        <images>registrationservice, persistenceservice, timeconsumingservice, configurationservice,
            actorserverservice, actorclientservice
        </images>
    </properties>

    <profiles>
        <profile>
            <id>configuration</id>
            <properties>
                <images>registrationservice, persistenceservice, timeconsumingservice, configurationservice
                </images>
            </properties>
        </profile>
        <profile>
            <id>akka</id>
            <properties>
                <images>registrationservice, actorserverservice, actorclientservice
                </images>
            </properties>
        </profile>
    </profiles>

    <dependencies>
        ...
        <dependency>
            <groupId>net.microservices.tutorial</groupId>
            <artifactId>actorclientservice</artifactId>
            <version>0.1</version>
        </dependency>
    </dependencies>
...

You will find the ClientServerService image in the sources.

After installing the actorclientservice artifact and starting docker with the akka profile, we should have the actorclient service registered:

and the image running here
.

Now, we could also see the logs of the running containers with the

mvn docker:logs -P akka

In the logs, you will see the ClientActor sending messages:

and the ServerActor replying them:

Conclusion

Well, you will find the image definition (and all the other missing stuff) in the project:

git clone https://github.com/gitgabrio/container-microservices-tutorial-6.git

In the next part we will add the Akka Cluster implementation so… stay tuned!!!

Any comment and suggestion will be greatly appreciated!!!

Tutorial on Microservices with Kotlin, SpringBoot, Akka and Docker – part 5

Intro

I left you in the previous post with the promise of add Akka services to our architecture; and that’s what will do in this and the next post. As always you may download the full code from the repository:

 git clone https://github.com/gitgabrio/container-microservices-tutorial-5.git

actorserver-service

Let’s begin adding the module declaration to the parent pom:

...
   </parent>
      <modules>
        <module>servicecommon</module>
        <module>registrationservice</module>
        <module>persistenceservice</module>
        <module>timeconsumingservice</module>
        <module>configurationservice</module>
        <module>actorserverservice</module>
        <module>docker</module>
    </modules>
    <properties>
...

This is the actor-service pom:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <groupId>net.microservices.tutorial</groupId>
        <artifactId>container-microservice</artifactId>
        <version>0.1</version>
        <relativePath>../</relativePath>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>actorserverservice</artifactId>
    <properties>
        <akka.version>2.4.2</akka.version>
        <!-- CONFIGURATIONS -->
        <start-class>net.microservices.tutorial.actorserverservice.ActorServerServer</start-class>
        <finalName>actorserverservice-${project.version}</finalName>
        <service.port>5555</service.port>
        <rs.port>1111</rs.port>
        <rs.ip>localhost</rs.ip>
        <akka.port>12552</akka.port>
        <container.ip>localhost</container.ip>
    </properties>
    <build>
        <finalName>${finalName}</finalName>
    </build>
    <dependencies>
        <dependency>
            <groupId>net.microservices.tutorial</groupId>
            <artifactId>servicecommon</artifactId>
            <version>0.1</version>
        </dependency>
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-actor_2.11</artifactId>
            <version>${akka.version}</version>
        </dependency>
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-remote_2.11</artifactId>
            <version>${akka.version}</version>
        </dependency>
    </dependencies>

</project>

Here you may notice the akka dependencies and a couple of akkas-specific properties:

  1. akka.port: this is the port where the Akka actor is listening
  2. container.ip: this is the public ip server, used when not injected by the docker plugin

code

In this module we will inject some properties in the eureka metadata map, to make them available for eventual “clients”, namely, the port:

net/microservices/tutorial/actorserverservice/resources/application.yml:

...
eureka:
  client:
    ...
  instance:
    ...
    metadata-map:
      port: ${AKKA_PORT:@akka.port@}
...

We will use this map in the next part of the tutorial; anyway, just note that the value could be populated by the module pom or by the docker plugin.

net/microservices/tutorial/actorserverservice/resources/application.conf:

akka {
  actor {
    provider = "akka.remote.RemoteActorRefProvider"
    serializers {
        proto = "akka.remote.serialization.ProtobufSerializer"
    }
  }
  remote {
    netty.tcp {
      hostname = localhost
      port = 4324
    }
 }
 log-sent-messages = on
 log-received-messages = on
 }

This is a bare-bone akka configuration. Unfortunately Akka does not read YAML files; it could be possible (see here for an example) with some extra code, but I do not think it is important, here, so I provided one as base configuration. Please note that the netty parameters (hostname and port) will be overridden at startup.
The important details are in the configuration class:

net/microservices/tutorial/actorserverservice/configurations/ActorServerConfiguration.kt:

...
     @Value("\${eureka.instance.metadata-map.port}")
    var akkaPort: Int = 0

    @Value("\${akka.remote.netty.tcp.hostname}")
    var akkaHostName: String = ""



    @Bean
    open fun actorSystem(): ActorSystem {
        val defaultApplication: Config = ConfigFactory.defaultApplication()
                .withValue("akka.remote.netty.tcp.hostname", ConfigValueFactory.fromAnyRef(akkaHostName))
                .withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(akkaPort))
        val system = ActorSystem.create("RemoteWorkerSystem", defaultApplication)
        system.actorOf(Props.create(ServerActor::class.java), "serverActor")
        return system
    }
...

First, there are two properties injected by Spring, namely

  1. eureka.instance.metadata-map.port -> akkaPort
  2. akka.remote.netty.tcp.hostname -> akkaHostName

Then, in the ActorSystem instantiation, we define its configuration

defaultApplication: Config = ConfigFactory.defaultApplication()

we are loading the default configuration, that will read application.conf;

.withValue("akka.remote.netty.tcp.hostname", ConfigValueFactory.fromAnyRef(akkaHostName))

we are overriding the akka.remote.netty.tcp.hostname value;

.withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(akkaPort))

we are overriding the akka.remote.netty.tcp.port value;

val system = ActorSystem.create("RemoteWorkerSystem", defaultApplication)

we instantiate the ActorSystem whose “akka-address” will be RemoteWorkerSystem

system.actorOf(Props.create(ServerActor::class.java), "serverActor")

we instantiate the ServerActor that will listen and respond to incoming messages.
And that’s is the bare-minimum akka implementation, for now.

image creation

Let’s update the docker configuration.
docker/pom.xml:

...
 <properties>
      ...
        <!-- services properties -->
        <registration.service.port>1111</registration.service.port>
        <persistence.service.port>2222</persistence.service.port>
        <timeconsuming.service.port>3333</timeconsuming.service.port>
        <configuration.service.port>4444</configuration.service.port>
        <actorserver.service.port>5555</actorserver.service.port>
    </properties>
</properties>

    <dependencies>
        <dependency>
            <groupId>net.microservices.tutorial</groupId>
            <artifactId>registrationservice</artifactId>
            <version>0.1</version>
        </dependency>
        <dependency>
            <groupId>net.microservices.tutorial</groupId>
            <artifactId>persistenceservice</artifactId>
            <version>0.1</version>
        </dependency>
        <dependency>
            <groupId>net.microservices.tutorial</groupId>
            <artifactId>timeconsumingservice</artifactId>
            <version>0.1</version>
        </dependency>
        <dependency>
            <groupId>net.microservices.tutorial</groupId>
            <artifactId>configurationservice</artifactId>
            <version>0.1</version>
        </dependency>
        <dependency>
            <groupId>net.microservices.tutorial</groupId>
            <artifactId>actorserverservice</artifactId>
            <version>0.1</version>
        </dependency>
    </dependencies>
...

but this time we will introduce some modifications. As you may have experienced, each docker container take a lot of resources, even if the actual code inside it is pretty small, as in this tutorial. There is a way to build and start only a subset of the images actually configured, with the docker plugin configuration tag filter; here you may define the image or the comma-separated list of images to manage.

...
<configuration>
   <filter>(IMAGES-TO-INCLUDE)</filter>
....

.
Now, to have a flexible build configuration, I have defined a global images property:

...
   <actorserver.service.port>5555</actorserver.service.port>
        <!-- Images to build and run - default to all-->
        <images>registrationservice, persistenceservice, timeconsumingservice, configurationservice,
            actorserverservice
        </images>
    </properties>
...

and a couple of profiles to eventually override it at build-time:

...
<profiles>
        <profile>
            <id>configuration</id>
            <properties>
                <images>registrationservice, persistenceservice, timeconsumingservice, configurationservice
                </images>
            </properties>
        </profile>
        <profile>
            <id>akka</id>
            <properties>
                <images>registrationservice, actorserverservice
                </images>
            </properties>
        </profile>
    </profiles>
...

Last, I use the property in the plugin configuration:

...
<build>
        <plugins>
            <!-- DOCKERIZE WITH FABRIC8 -->
            <plugin>
                <!-- The Docker Maven plugin is used to create docker image with the fat jar -->
                <groupId>io.fabric8</groupId>
                <artifactId>docker-maven-plugin</artifactId>
                <version>${docker.maven.plugin.fabric8.version}</version>
                <configuration>
                    <logDate>default</logDate>
                    <autoPull>true</autoPull>
                    <filter>${images}</filter>
                    <images>
...

Now, depending on the profile (or lack of) used for the docker goal, we could decide which image(s) build and run.

After installing the actorserverservice artifact and starting docker with the akka profile, we should have the actorserver service registered:

and the image running here
.
In the logs, you will see the akka instance listening:

Conclusion

Well, you will find the image definition (and all the other missing stuff) in the project:

git clone https://github.com/gitgabrio/container-microservices-tutorial-5.git

In the next part we will add the Akka Client service so… stay tuned!!!

Any comment and suggestion will be greatly appreciated!!!

Tutorial on Microservices with Kotlin, SpringBoot, Akka and Docker – part 4

Intro

In the previous post we have add a microservice that returns data slowly,the timeconsuming one. Now we will add the service that query with other twos to show data. We can pretend this is the “frontend” layer, maybe (in real application) full of javascript magics! Only the crucial aspects of the implementation will be covered here, so you may download the full code from the repository:

 git clone https://github.com/gitgabrio/container-microservices-tutorial-4.git

configuration-service

Let’s begin adding the module declaration to the parent pom:

...
   </parent>
      <modules>
        <module>servicecommon</module>
        <module>registrationservice</module>
        <module>persistenceservice</module>
        <module>timeconsumingservice</module>
         <module>configurationservice</module>
        <module>docker</module>
    </modules>
    <properties>
...

This is the configuration-service pom:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>container-microservice</artifactId>
        <groupId>net.microservices.tutorial</groupId>
        <version>0.1</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>configurationservice</artifactId>

    <properties>
        <start-class>net.microservices.tutorial.configurationservice.ConfigurationServer</start-class>
        <finalName>configurationservice-${version}</finalName>
        <service.port>4444</service.port>
        <container.ip>localhost</container.ip>
        <rs.port>1111</rs.port>
        <rs.ip>localhost</rs.ip>
        <ps.port>2222</ps.port>
        <ps.ip>localhost</ps.ip>
        <ts.port>3333</ts.port>
        <ts.ip>localhost</ts.ip>
    </properties>
    <build>
        <finalName>${finalName}</finalName>
    </build>
    <dependencies>
        <dependency>
            <groupId>net.microservices.tutorial</groupId>
            <artifactId>servicecommon</artifactId>
            <version>0.1</version>
        </dependency>
    </dependencies>
</project>

code

In this module we will actually use the Eureka architecture to

  1. ask the registrar for the address of the first available instance of the target service
  2. send request to the found instance of the target service

net/microservices/tutorial/configurationservice/resources/application.yml:

...
   persistenceservice:
  url: http://PERSISTENCE-SERVICE
timeconsumingservice:
  url: http://TIMECONSUMING-SERVICE
...

This is the first piece of the implementation: we are defining two properties to map the name of the services we want to talk with. As you may recall, this is the same name (upper case) we have defined for the services with the directive spring.application.name in the bootstrap.yml files.This properties are injected in the configuration file (forgive the naming! ):
net/microservices/tutorial/configurationservice/configurations/ConfigurationConfiguration.kt:

...
     @Value("\${persistenceservice.url}")
    var persistenceServiceUrl :String = ""

    @Value("\${timeconsumingservice.url}")
    var timeConsumingserviceUrl :String = ""

    /**
     * A customized RestTemplate that has the ribbon load balancer build in.
     * @return
     */
    @LoadBalanced
    @Bean
    open fun restTemplate(): RestTemplate {
        return RestTemplate()
    }

    /**
     * A customized RestTemplate that has the ribbon load balancer build in.
     * @return
     */
    @LoadBalanced
    @Bean
    open fun asyncRestTemplate(): AsyncRestTemplate {
        return AsyncRestTemplate()
    }
...

Please note also the AsyncRestTemplate, more about it later.

The service class forward the requests coming from the controller to the target -remote – destinations:

net/microservices/tutorial/configurationservice/services/UsersService.kt:

...
    @Service
class UsersService(persistenceServiceUrl: String, timeConsumingserviceUrl: String) {

    @Autowired
    @LoadBalanced
    private var restTemplate: RestTemplate? = null

    @Autowired
    @LoadBalanced
    private var asyncRestTemplate: AsyncRestTemplate? = null

    private var persistenceServiceUrl: String

    private var timeConsumingserviceUrl: String

    private var logger = Logger.getLogger(UsersService::class.java.name)

    init {
        this.persistenceServiceUrl = if (persistenceServiceUrl.startsWith("http"))
            persistenceServiceUrl
        else
            "http://" + persistenceServiceUrl
        this.timeConsumingserviceUrl = if (timeConsumingserviceUrl.startsWith("http"))
            timeConsumingserviceUrl
        else
            "http://" + timeConsumingserviceUrl
    }

    /**
     * The RestTemplate works because it uses a custom request-factory that uses
     * Ribbon to look-up the service to use. This method simply exists to show
     * this.
     */
    @PostConstruct
    fun demoOnly() {
        // Can't do this in the constructor because the RestTemplate injection
        // happens afterwards.
        logger.warning("The RestTemplate request factory is " + restTemplate!!.requestFactory.javaClass)
    }

    @Throws(Exception::class)
    fun findAll(): List<UserDTO>? {
        logger.info("findAll() invoked")
        var users: Array<UserDTO>? = null
        try {
            users = restTemplate!!.getForObject(persistenceServiceUrl + "/persons/", Array<UserDTO>::class.java)
        } catch (e: HttpClientErrorException) { // 404
            // Nothing found
            return null
        }
        if (users == null || users.size == 0)
            return null
        else
            return Arrays.asList(*users)
    }
...

    @Throws(Exception::class)
    fun findAsyncByNumber(id: Int): UserDTO? {
        logger.info("findAsyncByNumber() invoked")
        var user: UserDTO? = null
        try {
            val method = HttpMethod.GET
            val responseType = genericClass<UserDTO>()
            //create request entity using HttpHeaders
            val headers = HttpHeaders()
            headers.contentType = MediaType.TEXT_PLAIN
            val requestEntity = HttpEntity<String>("params", headers)
            val future = asyncRestTemplate?.exchange(timeConsumingserviceUrl + "/deferredpersons/{id}", method, requestEntity, responseType, id)
            //waits for the result
            val entity = future?.get()
            //prints body source code for the given URL
            user = entity?.body
        } catch (e: Exception) {
            logger.log(Level.SEVERE, e.message, e)
        }
        return user
    }
    
    private inline fun <reified T : Any> genericClass(): Class<T> = T::class.java
...

Well, there is something going on here.
First, we inject the services “url” in the contructor and use this to set the two instance variables persistenceServiceUrl and timeConsumingserviceUrl.
Next, we define the two variables restTemplate and asyncRestTemplate as Autowired and LoadBalanced. This last annotation tells to the RestTemplate to use a LoadBalancerClient. With this in place, each service’ request is forwarded to the first available instance of the target type of destination.
The AsyncRestTemplate is used to talk to the timeconsuming service because it return a ListenableFuture, that is what we need to make this type of requests.

The controller is like the others, so I will skip the details

image creation

Let’s update the docker configuration.
docker/pom.xml:

...
 <properties>
      ...
        <!-- services properties -->
        <registration.service.port>1111</registration.service.port>
        <persistence.service.port>2222</persistence.service.port>
        <timeconsuming.service.port>3333</timeconsuming.service.port>
        <configuration.service.port>4444</configuration.service.port>
    </properties>
</properties>

    <dependencies>
        <dependency>
            <groupId>net.microservices.tutorial</groupId>
            <artifactId>registrationservice</artifactId>
            <version>0.1</version>
        </dependency>
        <dependency>
            <groupId>net.microservices.tutorial</groupId>
            <artifactId>persistenceservice</artifactId>
            <version>0.1</version>
        </dependency>
        <dependency>
            <groupId>net.microservices.tutorial</groupId>
            <artifactId>timeconsumingservice</artifactId>
            <version>0.1</version>
        </dependency>
        <dependency>
            <groupId>net.microservices.tutorial</groupId>
            <artifactId>configurationservice</artifactId>
            <version>0.1</version>
        </dependency>
    </dependencies>
...

This time, in the image definition we will add three links:

...
registrationservice:rs, persistenceservice:ps, timeconsumingservice:ts
...

With this definition, the fabric8 plugin will inject a bunch of RS_XXX, PS_XXX and TS_XXX variables. Actually, for our project only the RS_XXX ones are needed, because the configuration service never call directly the others, but instead “ask” the address to the registration service.

After installing the artifacts and starting docker, we should have the configuration service registered:

and the image running here

with the persistence service requests

and the timeconsuming ones (of course, actual data will vary)

Conclusion

Well, you will find the image definition (and all the other missing stuff) in the project:

git clone https://github.com/gitgabrio/container-microservices-tutorial-4.git

In the next part we will begin to add Akka services so… stay tuned!!!

Any comment and suggestion will be greatly appreciated!!!

Tutorial on Microservices with Kotlin, SpringBoot, Akka and Docker – part 3

Intro

In the previous post we have built our first microservice, the persistence one. Now we will add a new one to simulate a long lasting requesting, i.e. a request that may take long time to be fulfilled.  Only the crucial aspects of the implementation will be covered here, so you may download the full code from the repository:

 git clone https://github.com/gitgabrio/container-microservices-tutorial-3.git

timeconsuming-service

For this tutorial we will reuse the User entity defined in service-common.Let’s begin adding the module declaration to the parent pom:

...
   </parent>
      <modules>
        <module>servicecommon</module>
        <module>registrationservice</module>
        <module>persistenceservice</module>
        <module>timeconsumingservice</module>
        <module>docker</module>
    </modules>
    <properties>
...

This is the timeconsuming-service pom:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>container-microservice</artifactId>
        <groupId>net.microservices.tutorial</groupId>
        <version>0.1</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>timeconsumingservice</artifactId>

    <properties>
        <start-class>net.microservices.tutorial.timeconsumingservice.TimeConsumingServer</start-class>
        <finalName>timeconsumingservice-${project.version}</finalName>
        <service.port>3333</service.port>
        <container.ip>localhost</container.ip>
        <rs.port>1111</rs.port>
        <rs.ip>localhost</rs.ip>
    </properties>
    <build>
        <finalName>${finalName}</finalName>
    </build>
    <dependencies>
        <dependency>
            <groupId>net.microservices.tutorial</groupId>
            <artifactId>servicecommon</artifactId>
            <version>0.1</version>
        </dependency>
    </dependencies>

</project>

code

Most of the code and configuration is really similar to the persistence-service one, so I won’t repeat the details. But there are some differences in the Controller and Service related to the asynchronous call:
net/microservices/tutorial/timeconsumingservice/services/TimeConsumingService.kt:

...
    @Throws(Exception::class)
    fun findAll(): List<UserDTO>? {
        logger.info("findAll() invoked")
        val toReturn: ArrayList<UserDTO> = ArrayList()
        repeat(6) { i ->
            val toAdd: UserDTO = createUserEntity(i)
            toReturn.add(toAdd)
            try {
                Thread.sleep(1000)
            } catch (e : InterruptedException) {
                logger.info("I've been interrupted")
            }
        }
        return toReturn
    }
...

This service returns five users just created on demand, but with some delay, to simulate a time consuming elaboration (it should take almost one minute to complete).

net/microservices/tutorial/timeconsumingservice/controllers/TimeConsumingController.kt:

...
    /*
     Use for long-polling
     */
    @RequestMapping("/deferredpersons")
    @Async // This means it will run on a different thread
    fun allDeferredUsers(): DeferredResult<List<UserDTO>?> {
        logger.info("web-service allDeferredUsers() invoked")
        val toReturn: DeferredResult<List<UserDTO>?> = DeferredResult()
        Thread({
            val users = timeConsumingService.findAll()
            logger.info("web-service allDeferredUsers() found: " + users!!)
            toReturn.setResult(users)
        }, "MyThread-$counter").start()
        counter += 1
        return toReturn
    }
...

This controller is actually implementing a long polling, returning a Spring Web DeferredResult.Basically, the actual data is populated inside the thread, and the client calling this method should do it in asynchronous manner, as we will see in the next part.

image creation

The docker image management is identical as what done before for the other services, beside the different image name and listening port.
docker/pom.xml:

...
 <properties>
      ...
        <!-- services properties -->
        <registration.service.port>1111</registration.service.port>
        <persistence.service.port>2222</persistence.service.port>
        <timeconsuming.service.port>3333</timeconsuming.service.port>
    </properties>
</properties>

    <dependencies>
        <dependency>
            <groupId>net.microservices.tutorial</groupId>
            <artifactId>registrationservice</artifactId>
            <version>0.1</version>
        </dependency>
        <dependency>
            <groupId>net.microservices.tutorial</groupId>
            <artifactId>persistenceservice</artifactId>
            <version>0.1</version>
        </dependency>
        <dependency>
            <groupId>net.microservices.tutorial</groupId>
            <artifactId>timeconsumingservice</artifactId>
            <version>0.1</version>
        </dependency>
    </dependencies>
...

After installing the artifacts and starting docker, we should have the timeconsuming service registered:

and the image running here;

with the time consuming service; here’s the result (of course, actual data will vary):

Conclusion

Well, you will find the &ltg;image&gt, definition (and all the other missing stuff) in the project:

git clone https://github.com/gitgabrio/container-microservices-tutorial-3.git

In the next part we will add another Eureka client container that will talk with persistence and the timeconsuming services so… stay tuned!!!

Any comment and suggestion will be greatly appreciated!!!

Tutorial on Microservices with Kotlin, SpringBoot, Akka and Docker – part 2

Intro

In the previous post we have built the scaffold of our microservice project, implementing the Eureka registration service. Now we will begin ti add some real functionality, beginning (surprisingly) with the data access. Only the crucial aspects of the implementation will be covered here, so you may download the full code from the repository:

git clone https://github.com/gitgabrio/container-microservices-tutorial-2.git

persistence-service

For this tutorial we will create a simple entity, User, with the basic CRUD operations. We will use HsqlDb as in-memory database and JPA for ORM-mapping, with the utilities offered by Spring-Data. Let’s begin adding the module declaration to the parent pom:

...
   </parent>
      <modules>
        <module>servicecommon</module>
        <module>registrationservice</module>
        <module>persistenceservice</module>
        <module>docker</module>
    </modules>
    <properties>
...

This is the persistence-service pom:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>container-microservice</artifactId>
        <groupId>net.microservices.tutorial</groupId>
        <version>0.1</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>persistenceservice</artifactId>

    <properties>
        <start-class>net.microservices.tutorial.persistenceservice.PersistenceServer</start-class>
        <finalName>registrationservice-${project.version}</finalName>
        <service.port>2222</service.port>
        <container.ip>localhost</container.ip>
        <rs.port>1111</rs.port>
        <rs.ip>localhost</rs.ip>
    </properties>

    <dependencies>
        <dependency>
            <groupId>net.microservices.tutorial</groupId>
            <artifactId>servicecommon</artifactId>
            <version>0.1</version>
        </dependency>
    </dependencies>

    <build>
        <finalName>${finalName}</finalName>
    </build>

</project>

Inside the resources directory we will put all the html templates used by Thymeleaf and the Spring configurations file: bootstrap.yml and application.yml:

persistenceservice/resources/bootstrap.yml:

# Spring properties
# Service registers under this name
spring:
  application:
      name: persistence-service

persistenceservice/resources/application.yml:

# Spring properties
spring:
  # Ignore Eureka dashboard FreeMarker templates
  freemarker:
      enabled: false
  # Allow Thymeleaf templates to be reloaded at runtime
  thymeleaf:
       cache: false
  # Trailing / mandatory
  # Template location for this application only
       prefix: classpath:/templates/

  # Database configuration
  # Spring Boot automatically creates a JPA EntityManagerFactory using Hibernate
  # but we need to override some defaults:
  #
  #   1. Stop Hibernate automatically creating a schema, we are doing it in
  #      schema.sql. Instead check the tables match their JPA mapped classes
  jpa:
    hibernate:
      ddl-auto: none
      naming_strategy: org.hibernate.cfg.ImprovedNamingStrategy
    database: HSQL
    show-sql: true

# Map the error path to error template (for Thymeleaf)
error:
  path: /error

# HTTP Server
# HTTP (Tomcat) port
server:
  port:  ${SERVICE_PORT:@service.port@}
eureka:
  client:
    serviceUrl:
      defaultZone: http://${RS_PORT_1111_TCP_ADDR:@rs.ip@}:${RS_PORT_1111_TCP_PORT:@rs.port@}/eureka/
  instance:
    preferIpAddress: true
    # DO NOT DO THIS IN PRODUCTION
    leaseRenewalIntervalInSeconds: 5

#Test db
testDatasource:
  url: jdbc:hsqldb:file:testdb
  username: SA
  password:
  driverClassName: org.hsqldb.jdbcDriver

# Disabling security for Actuator' REST services
security:
  basic:
    enabled: false
management:
  security:
    enabled: false

code

Since this tutorial is focused on the microservices architecture, I will not explain all the details of this configuration: fell free to ask for further explanation, if needed.
Please note:

  • port: ${SERVICE_PORT:@service.port@}: this is the listening port of the application; as for the registration-server, will be injected at compile time;
  • defaultZone:http://${RS_PORT_1111_TCP_ADDR:@rs.ip@}:${RS_PORT_1111_TCP_PORT:@rs.port@}/eureka/: this is the address of the registration service; RS_PORT_1111_TCP_ADDR and RS_PORT_1111_TCP_PORT will be dynamically injected by the fabric8 plugin during image building (more about them in the image configuration), otherwise rs.ip and rs.port (defined in the persistenceservice pom.xml)will be used
  • preferIpAddress: true: this is needed mostly for applications running inside docker containers; without this, the service will register itself with the container id, so it could not be attainable by the other services

persistenceservice/src/main/kotlin/net/microservices/tutorial/persistenceservice/PersistenceServer.kt

@file:JvmName("PersistenceServer")
package net.microservices.tutorial.persistenceservice


import net.microservices.tutorial.persistenceservice.configurations.PersistenceConfiguration
import org.springframework.boot.SpringApplication
import org.springframework.boot.autoconfigure.EnableAutoConfiguration
import org.springframework.cloud.client.discovery.EnableDiscoveryClient
import org.springframework.context.annotation.Import

import java.util.logging.Logger

/**
 * Run as a micro-service, registering with the Discovery Server (Eureka).
 *
 *
 * Note that the configuration for this application is imported from
 * [PersistenceConfiguration]. This is a deliberate separation of concerns.
 */
@EnableAutoConfiguration
@EnableDiscoveryClient
@Import(PersistenceConfiguration::class)
open class PersistenceServer {

    protected var logger = Logger.getLogger(PersistenceServer::class.java.name)

    companion object {

        /**
         * Run the application using Spring Boot and an embedded servlet engine.

         * @param args
         * *            Program arguments - ignored.
         */
        @JvmStatic fun main(args: Array<String>) {
            SpringApplication.run(PersistenceServer::class.java, *args)
        }
    }
}

This is the application entry point. As for the registration service, there is a single annotation needed to transform the module in an Eureka client:

@EnableDiscoveryClient

With this in place, we are telling Spring to instantiate the class as a Eureka client, taking the properties from the *.yml files.

image creation

What is left now is to configure the image to be built inside the docker pom docker/pom.xml: first, as the dependency on the persistence service:

<dependency>
            <groupId>net.microservices.tutorial</groupId>
            <artifactId>persistenceservice</artifactId>
            <version>0.1</version>
        </dependency>

then add this just below the registration service’ one:
docker/pom.xml:

...
                        </image>
            <!-- Persistence service -->
                        <image>
                            <!-- Alias name which can used for linking containers during runtime -->
                            <alias>persistenceservice</alias>
                            <name>${docker.repo}/persistence-service:${project.version}</name>
                            <!-- ....................................... -->
                            <!-- Build configuration for creating images -->
                            <!-- ....................................... -->
                            <build>
                                <from>java:8u40</from>
                                <!-- Assembly descriptor holds the reference to the created artifact-->
                                <assembly>
                                    <descriptor>${basedir}/../persistenceservice/src/main/fabric8/assembly.xml</descriptor>
                                </assembly>
                                <!-- Expose ports -->
                                <ports>
                                    <port>${persistence.service.port}</port>
                                </ports>
                                <!-- Default command for the build image -->
                                <cmd>java -Djava.security.egd=file:/dev/./urandom -jar /maven/persistenceservice.jar
                                </cmd>
                            </build>
                            <!-- ............................................................... -->
                            <!-- Runtime configuration for starting/stopping/linking containers -->
                            <!-- ............................................................... -->
                            <run>
                                <!-- Assign dynamically mapped ports to maven variables (which can be reused in integration tests) -->
                                <ports>
                                    <port>${persistence.service.port}:${persistence.service.port}</port>
                                </ports>
                                <env>
                                    <SERVICE_PORT>${persistence.service.port}</SERVICE_PORT>
                                </env>
                                <wait>
                                    <!-- Check for this URL to return a 200 return code .... -->
                                    <url>http://${docker.host.address}:${persistence.service.port}/</url>
                                     <!-- ... but at max 30 seconds -->
                                    <time>30000</time>
                                </wait>
                                <dependsOn>
                                    <container>
                                        registrationservice
                                    </container>
                                </dependsOn>
                                <links>
                                    <!-- Links can be referenced via alias (registrationservice) or name (${docker.repo}/registration-service:${project.version}) -->
                                    <!-- THIS SHOULD CREATE SOME RS_XXX variables -->
                                    <link>registrationservice:rs</link>
                                </links>
                                <log>
                                    <prefix>TC</prefix>
                                    <color>cyan</color>
                                </log>
                            </run>
                        </image>
                    </images>
...

This is almost identical to the registration service, but there are a couple of new tags:

  • dependsOn: this is to indicate the container dependency, i.e. we are telling to the plugin that the container for this image requires the registrationservice container
  • links: this is needed to inject in the persistence service container some properties of the registration service one

In the previous post I have explained how to inject properties inside container as environment variables, and that some of them are automatically generated and injected by the fabric8 plugin. In the documentation you will find more explanation; simply put, with  &lt;link&gt;registrationservice:rs&lt;/link&gt; we are telling to the plugin to use “rs” as prefix for some automatically generated properties, with the following convention (everything upper-case):

(prefix)_PORT=(protocol)://(container_ip):(exposed_port_number)
(prefix)_PORT_(port_number)_TCP=tcp://(container_ip):(port_number)
(prefix)_PORT_(port_number)_TCP_PROTO=tcp
(prefix)_PORT_(port_number)_TCP_PORT=(port_number)
(prefix)_PORT_(port_number)_TCP_ADDR=(container_ip)

For the current project, the port number is 1111 (registration.service.port), the prefix is rs, and let’s say that the registration container ip is 172.17.0.2. Following the above rules, we will have:


RS_PORT=tcp://172.17.0.2:1111
RS_PORT_1111_TCP=tcp://172.17.0.2:1111
RS_PORT_1111_TCP_PROTO=tcp
RS_PORT_1111_TCP_PORT=1111
RS_PORT_1111_TCP_ADDR=172.17.0.2

This properties will be injected as environment variables in the persistence service container. Now, if you look back at application.yml, you will find exactly these:

...
defaultZone: http://${RS_PORT_1111_TCP_ADDR}:${RS_PORT_1111_TCP_PORT}/eureka/
...

Now, calling from the root directory

 
mvn install 

We should be able to build all the modules and the two images.
Last, let’s start them all. Go to the docker directory and type

mvn docker:start

We should have:

the two  containers up and running


# docker ps
CONTAINER ID IMAGE                        COMMAND                CREATED            STATUS            PORTS                  NAMES
36909d6973b0 ***/persistence-service:0.1 "/bin/sh -c 'java -Dj"  About a minute ago Up About a minute 0.0.0.0:2222->2222/tcp distracted_varahamihira
3f8bebf677ca ***/registration-service:0.1 "/bin/sh -c 'java -Dj" 2 minutes ago      Up About a minute 0.0.0.0:1111->1111/tcp jovial_galileo


the registration service  here

the persistence service here

the persistence service registered inside the registration service

some REST for CRUD operation

Conclusion

Well, feel free to look in the code to see what REST services are implemented
To clone the project:

git clone https://github.com/gitgabrio/container-microservices-tutorial-2.git

In the next part we will add another Eureka client container that will implement a long time request service so… stay tuned!!!

Any comment and suggestion will be greatly appreciated!!!

Tutorial on Microservices with Kotlin, SpringBoot, Akka and Docker – part 1

Intro

Hi all! Recently I have begun my journey toward the “microservices” world, and I have found out that there are actually a lot of different ways to implement that simple concept, i.e. split a complex project in small, well focused, components. So, I decided to create a “microservices” architecture stub that could be used as a model for *real* implementations. In that stub I will put different technologies, not necessarily to be always used, but to work as template. This tutorial will follow step-by-step the project development: I will start from a multi-module SpringBoot project and “Dockerize” it… have fun!!!!

Tutorial parts

  1. Project setup and registration service
  2. Accessing db with persistence service
  3. Long task to be consumed asynchronously with timeconsuming service
  4. Asking data to other services with configuration service
  5. Adding an Akka server with actorserver service
  6. Create the Akka client service to talk with the actoreserver

Frameworks and architecture

The project will be written in Kotlin (there will be very few lines of codes, so it should not be an issues), and will use Spring Cloud as framework and Docker as container manager. The microservices orchestration will be entrusted to the Eureka service. I would like to manage everything, even the Docker images, with maven goals, so I have looked around for available plugins. At the end I have choosen the RedHat Fabric8 plugin, of which you can read here.

 System requirements

All the development will be done on an OpenSuSE Linux, where I have installed Docker (1.12.6) and Oracle JDK 1.8.0_65. Last, I use IntelliJ IDEA as IDE, but the project will be 100% Maven-based, there should be no issues with other IDEs. Check that your Docker environment is working before proceeding.

Ready? Go on….

Setup project

You may download the full code from the repository:

git clone https://github.com/gitgabrio/container-microservices-tutorial-1.git

service-parent (parent pom/multimodule)

The project will be a Maven multimodule, so let’s begin creating the base pom.xml:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>net.microservices.tutorial</groupId>
    <artifactId>container-microservice</artifactId>
    <version>0.1</version>
    <packaging>pom</packaging>
    <parent>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-parent</artifactId>
        <version>Dalston.RELEASE</version>
        <relativePath></relativePath>
    </parent>
    <properties>
        <!-- TEST -->
        <junit.version>4.12</junit.version>
        <!-- LOG -->
        <slf4j.version>1.7.12</slf4j.version>
        <!-- PLUGINS -->
        <maven-compiler-plugin.version>3.1</maven-compiler-plugin.version>
        <maven-surefire-plugin.version>2.19.1</maven-surefire-plugin.version>
        <maven-gpg-plugin.version>1.6</maven-gpg-plugin.version>
        <!-- CONFIGURATIONS -->
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.source>1.8</java.source>
        <java.target>1.8</java.target>
        <kotlin.version>1.1.0</kotlin.version>
    </properties>
    <dependencies>
        <dependency>
            <!-- Setup Spring Boot -->
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <!-- Setup Spring MVC & REST, use Embedded Tomcat -->
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-thymeleaf</artifactId>
        </dependency>
        <dependency>
            <!-- Setup Spring Data common components -->
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-commons</artifactId>
        </dependency>
        <dependency>
            <!-- Testing starter -->
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
        <dependency>
            <!-- Setup Spring Data JPA Repository support -->
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>
        <dependency>
            <!-- In-memory database for testing/demos -->
            <groupId>org.hsqldb</groupId>
            <artifactId>hsqldb</artifactId>
        </dependency>
        <dependency>
            <!-- Spring Cloud starter -->
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter</artifactId>
        </dependency>
        <dependency>
            <!-- Eureka service registration -->
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-eureka-server</artifactId>
        </dependency>
        <dependency>
            <!-- Kotlin -->
            <groupId>org.jetbrains.kotlin</groupId>
            <artifactId>kotlin-stdlib</artifactId>
            <version>${kotlin.version}</version>
        </dependency>
    </dependencies>
    <build>
        <sourceDirectory>src/main/kotlin</sourceDirectory>
        <testSourceDirectory>src/test/kotlin</testSourceDirectory>
        <resources>
            <resource>
                <filtering>true</filtering>
                <directory>src/main/resources</directory>
                <includes>
                    <include>**/*.xml</include>
                    <include>**/*.yml</include>
                    <include>**/*.sql</include>
                    <include>**/*.html</include>
                    <include>**/*.conf</include>
                    <include>**/*.properties</include>
                </includes>
            </resource>
        </resources>
        <testResources>
            <testResource>
                <directory>src/test/resources</directory>
            </testResource>
        </testResources>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>${maven-compiler-plugin.version}</version>
                <configuration>
                    <source>${java.source}</source>
                    <target>${java.target}</target>
                    <maxmem>256M</maxmem>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <executions>
                    <execution>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.jetbrains.kotlin</groupId>
                <artifactId>kotlin-maven-plugin</artifactId>
                <version>${kotlin.version}</version>
                <executions>
                    <execution>
                        <id>compile</id>
                        <phase>process-sources</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                        <configuration>
                            <sourceDirs>
                                <source>src/main/kotlin</source>
                            </sourceDirs>
                        </configuration>
                    </execution>
                    <execution>
                        <id>test-compile</id>
                        <phase>process-test-sources</phase>
                        <goals>
                            <goal>test-compile</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-gpg-plugin</artifactId>
                <version>${maven-gpg-plugin.version}</version>
                <configuration>
                    <skip>true</skip>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

That’s it. I have declared spring-cloud-starter-parent as parent to have available all the SpringBoot + Eureka stack out-of-the-box.
For the moment being, it is just an empty, useless box, but we will begin to fill it immediately.

service-common

First of all, let’s create a “common” module: it will be used to contain classes and utilities used by real “microservices”. I have read somewhere that there should be absolutely no code-linking between the microservices, i.e. they should not have any code in common: clearly this “service-common” breaks the rule, and I understand the sense in keeping code-bases completely isolated between them, in real situations, with really complex projects. But on the other side I think that for this tutorial it could be accepted, just to avoid repeated code.
Let’s add it as “module” to the “parent” pom, just below the close of the “parent” tag, in the parent pom:

... 
</parent>
<modules>
    <module>servicecommon</module>
</modules>
<properties>
...

Here’s the “servicecommon” pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <artifactId>servicecommon</artifactId>
    <packaging>jar</packaging>
    <parent>
        <groupId>net.microservices.tutorial</groupId>
        <artifactId>container-microservice</artifactId>
        <version>0.1</version>
        <relativePath>../</relativePath>
    </parent>
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>1.5.2.RELEASE</version>
                <configuration>
                    <skip>true</skip>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

Please note the spring-boot-maven-plugin configuration, to avoid “repackage” goal to be executed in this module.

registration-service

The registration service will be the first “microservice” to be implemented. Add it as a module, just below the “servicecommon”, in the parent pom:

... 
</parent>
<modules>
  <module>servicecommon</module>
  <module>registrationservice</module>
</modules>
<properties>
...

Now, let’s create the “registrationservice” pom.xml:

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <artifactId>registrationservice</artifactId>
    <packaging>jar</packaging>
    <parent>
        <groupId>net.microservices.tutorial</groupId>
        <artifactId>container-microservice</artifactId>
        <version>0.1</version>
        <relativePath>../</relativePath>
    </parent>
    <properties>
        <start-class>net.microservices.tutorial.registrationservice.RegistrationServer</start-class>
        <finalName>registrationservice-${project.version}</finalName>
        <service.port>1111</service.port>
        <container.ip>localhost</container.ip>
    </properties>
    <build>
        <finalName>${finalName}</finalName>
    </build>
</project>

Please take note of the service.port and container.ip properties: more about them later.

code

We will use the Eureka protocol for microservices orchestration. Simply put, there is (at least) one service that act as “registrar” to keep note of all the other running instances (microservice type and ip). All other services are “client”s, and register themselves with the registration service. Whenever one service needs to talk with another one, it asks the registration service for the address of the first available instance (there is a load-balancing in there) of that type of service, and then use the received address for the communication.
Using SpringBoot dark magic, setting up a registrar is a matter of few lines of yml and annotations:
registrationservice/src/main/resources/application.yml

# Configure this Discovery Server
eureka:
  instance:
    hostname: ${CONTAINER_IP:@container.ip@}
    #enableSelfPreservation: false
    preferIpAddress: true
    # DO NOT DO THIS IN PRODUCTION
    leaseRenewalIntervalInSeconds: 5
  client:
    registerWithEureka: false
    fetchRegistry: false

server:
  port: ${SERVICE_PORT:@service.port@}
# Discovery Server Dashboard uses FreeMarker.  Don't want Thymeleaf templates
spring:
  thymeleaf:
    enabled: false
security:
  basic:
    enabled: false
management:
  security:
    enabled: false

Please note the two “MAVEN-style” properties ${CONTAINER_IP:@container.ip@} and ${SERVICE_PORT:@service.port@}: here we are telling the compiler to use the first variable (CONTAINER_IP, SERVICE_PORT) if available, otherwise the second one (@container.ip@, @service.port@) if not.
CONTAINER_IP and SERVICE_PORT will be populated later during docker image creation, inside the fabric8 plugin configuration, while @container.ip@ and @service.port@ are injected from the pom.xml. With this in place we can run the service stand-alone, starting it with

mvn spring-boot:start

from the registrationservice directory, or inside the docker container, as we will see later.
registrationservice/src/main/kotlin/net/microservices/tutorial/registrationservice/RegistrationServer.kt

@file:JvmName("RegistrationServer")
package net.microservices.tutorial.registrationservice

import org.springframework.boot.SpringApplication
import org.springframework.boot.autoconfigure.EnableAutoConfiguration
import org.springframework.boot.autoconfigure.jdbc.DataSourceAutoConfiguration
import org.springframework.boot.autoconfigure.jdbc.DataSourceTransactionManagerAutoConfiguration
import org.springframework.boot.autoconfigure.orm.jpa.HibernateJpaAutoConfiguration
import org.springframework.cloud.netflix.eureka.server.EnableEurekaServer
import org.springframework.context.annotation.ComponentScan
import org.springframework.context.annotation.Configuration

@Configuration
@EnableAutoConfiguration(exclude = arrayOf(DataSourceAutoConfiguration::class, DataSourceTransactionManagerAutoConfiguration::class, HibernateJpaAutoConfiguration::class))
@ComponentScan
@EnableEurekaServer
open class RegistrationServer {

    companion object {

            /**
             * Run the application using Spring Boot and an embedded servlet engine.

             * @param args
             * *            Program arguments - ignored.
             */
            @JvmStatic fun main(args: Array<String>) {
                SpringApplication.run(RegistrationServer::class.java, *args)
            }
        }
}

Please note the open class definition: this is needed, as in all the Kotlin classes managed by Spring, because at runtime Spring creates proxies of these classes, and by default Kotlin classes are final (Kotlin, I love you!).

Actually, all the magic is here:

@EnableEurekaServer

With this annotation we are telling Spring to setup a Eureka Registrar Server, using as properties the ones defined (by default) in the application.yml file.

docker

Last step will be the creation of the docker module.
To cut a long story short, fabric8 plugin uses the maven-assembly-plugin to build the images. Unfortunately, making this plugin work for a multimodule project is a little bit tricky (you may find detailed explanation here), so one solution is to create an “empty” module used just to create the images.
Let’s go back to the parent pom, and add the module declaration:

... 
</parent>
<modules>
  <module>servicecommon</module>
  <module>registrationservice</module>
  <module>docker</module>
</modules>
<properties>
...

Now, the pom.xml:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>container-microservice</artifactId>
        <groupId>net.microservices.tutorial</groupId>
        <version>0.1</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>
    <artifactId>docker</artifactId>
    <packaging>pom</packaging>
    <properties>
        <!-- PLUGINS -->
        <docker.maven.plugin.fabric8.version>0.21.0</docker.maven.plugin.fabric8.version>
        <!-- CONFIGURATIONS -->
        <docker.repo>(YOUR-DOCKER-REPO-NAME)</docker.repo>
        <!-- services properties -->
        <registration.service.port>1111</registration.service.port>
    </properties>

    <dependencies>
        <dependency>
            <groupId>net.microservices.tutorial</groupId>
            <artifactId>registrationservice</artifactId>
            <version>0.1</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!-- DOCKERIZE WITH FABRIC8 -->
            <plugin>
                <!-- The Docker Maven plugin is used to create docker image with the fat jar -->
                <groupId>io.fabric8</groupId>
                <artifactId>docker-maven-plugin</artifactId>
                <version>${docker.maven.plugin.fabric8.version}</version>
                <configuration>
                    <logDate>default</logDate>
                    <autoPull>true</autoPull>
                    <images>
                        <!-- Registration service -->
                        <image>
                            <!-- Alias name which can used for linking containers during runtime -->
                            <alias>registrationservice</alias>
                            <name>${docker.repo}/registration-service:${project.version}</name>
                            <!-- ....................................... -->
                            <!-- Build configuration for creating images -->
                            <!-- ....................................... -->
                            <build>
                                <from>java:8u40</from>
                                <!-- Assembly descriptor holds the reference to the created artifact-->
                                <assembly>
                                    <descriptor>${basedir}/../registrationservice/src/main/fabric8/assembly.xml
                                    </descriptor>
                                </assembly>
                                <!-- Expose ports -->
                                <ports>
                                    <port>${registration.service.port}</port>
                                </ports>
                                <!-- Default command for the build image -->
                                <cmd>java -Djava.security.egd=file:/dev/./urandom -jar /maven/registrationservice.jar
                                </cmd>
                            </build>
                            <!-- ............................................................... -->
                            <!-- Runtime configuration for starting/stopping/linking containers -->
                            <!-- ............................................................... -->
                            <run>
                                <!-- Assign dynamically mapped ports to maven variables (which can be reused in integration tests) -->
                                <ports>
                                    <port>${registration.service.port}:${registration.service.port}</port>
                                </ports>
                                <env>
                                    <SERVICE_PORT>${registration.service.port}</SERVICE_PORT>
                                    <CONTAINER_IP>${docker.container.registrationservice.ip}</CONTAINER_IP>
                                </env>
                                <wait>
                                    <!-- Check for this URL to return a 200 return code .... -->
                                    <url>http://${docker.host.address}:${registration.service.port}/</url>
                                    <!-- ... but at max 20 seconds -->
                                    <time>20000</time>
                                </wait>
                                <log>
                                    <prefix>TC</prefix>
                                    <color>cyan</color>
                                </log>
                            </run>
                        </image>
                    </images>
                </configuration>
                <!-- Hooking into the lifecycle -->
                <executions>
                    <execution>
                        <id>docker-build</id>
                        <goals>
                            <goal>build</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>start</id>
                        <phase>pre-integration-test</phase>
                        <goals>
                            <goal>build</goal>
                            <goal>start</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>stop</id>
                        <phase>post-integration-test</phase>
                        <goals>
                            <goal>stop</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <version>1.5.2.RELEASE</version>
                <configuration>
                    <skip>true</skip>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

Well, there is some hot stuff going on here.
Note that the packaging is pom, since there is no code in that project.
Also, please note the dependency on the registrationservice artifact, needed for the assembly plugin to work.
Now, the interesting part.
The guys at Fabric8 decided that all (or almost all) the image configuration (creation and starting) should be inside the pom, so there are a lot a properties reflecting the ones of the standard Dockerfile.
The configuration is made of two parts: build and run.
Inside the build we will put information for the image creation:

  • : the base image (java:8u40 in this case)
  • : list of ports exposed by the container (please note, this is not the  external_port:internal_port mapping)
  • : the command to be executed inside the container
  • : this is the pointer to the maven-assembly plugin descriptor; basically, this descriptor tells maven what should be put inside the built image and what should be the name of the jar (the assembly.xml will be shown below).

Inside the run we will put information for the container starting:

  • : list of  external_port:internal_port mapping
  • : variables injected inside the container
  • : command to execute to check that the container has started successfully.

Beside that, there are some properties that are populated when the container starts, for example:

  • docker.container.(image-name).ip: this contain the “public” ip of the running container;
  • docker.host.address: this contain the host address of the running container (localhost);

To make them available inside the container, we can declare them in the env tag, and the property name will be the one created inside the container; e.g.:

<env> 
   <SERVICE_PORT>${registration.service.port}</SERVICE_PORT>
   <CONTAINER_IP>${docker.container.registrationservice.ip}</CONTAINER_IP>
</env>

Inside the container we will have $SERVICE_PORT (with the registration.service.port value, statically defined in the pom) and $CONTAINER_IP (with the docker.container.registrationservice.ip value, dynamically defined at start time).
There are another couple of run tags used for images linking/dependency, but we will look at them in the next part of the tutorial.

Assembly

The following snippet instruct the plugin to look for the assembly description in a specific file:

...
<assembly>
       <descriptor>${basedir}/../registrationservice/src/main/fabric8/assembly.xml
       </descriptor>
</assembly>
...

The file is inside the registrationservice module:
registrationservice/src/main/fabric8/assembly.xml

<assembly xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.2 http://maven.apache.org/xsd/assembly-1.1.2.xsd">
    <!-- Assembly specifying Dockerbuild for fabric8/docker-maven-plugin -->
    <dependencySets>
        <dependencySet>
            <useProjectArtifact>false</useProjectArtifact>
            <includes>
                <include>net.microservices.tutorial:registrationservice</include>
            </includes>
            <outputDirectory>.</outputDirectory>
            <outputFileNameMapping>registrationservice.jar</outputFileNameMapping>
        </dependencySet>
    </dependencySets>
</assembly>

Here we are telling maven to include only the registrationservice artifact and to put it inside registrationservice.jar
Note that it is possible to avoid that file, putting its content inside the assembly directive, in the inline tag:

...
<assembly>
    <inline>
    <dependencySets>
        <dependencySet>
            <useProjectArtifact>false</useProjectArtifact>
            <includes>
                <include>net.microservices.tutorial:registrationservice</include>
            </includes>
            <outputDirectory>.</outputDirectory>
            <outputFileNameMapping>registrationservice.jar</outputFileNameMapping>
        </dependencySet>
    </dependencySets>
    </inline>
</assembly>
...

but actually I do prefer to keep it outside – it could be easier to manage, eventually, without cluttering too much the pom itself.

docker image creation, start and stop

Now we have all the pieces in place to build and start our first image, but before proceeding any further remember to replace the (YOUR-DOCKER-REPO-NAME) property with the name of your Docker repository.
Calling

mvn install

from the console, in the project root, will:

  1.  recursively build all the modules in the project
  2. install all the modules
  3. create the reactiveservice image.

The first time that the image is built, the base image will be downloaded (if not already present in the local Docker instance). Soon after the build, the image is tested starting a container and invoking the command defined in the wait directive: if the command is successfull, the build is marked with success. Whenever a new version of the image is built, the plugin takes care of removing the old one.

Now, to start the container, let’s go in the docker directory and type:

mvn docker:start

This will create and start a new container with the image just built and the parameters defined in the run configuration. If you execute the command in debug mode (with the -X parameter) you will see the actual configuration sent (i.e. with all the placeholders replaced) and also the http requests issued to the Docker daemon (and, for free, all the image’ data transferred to it, in hexadecimal format!).
To verify that everything is working, just open the browser here and you should see the Eureka dashboard.

Finally, to stop the container execute

mvn docker:stop

This will stop the container and remove it.

Conclusion

Well, I think we have touched a lot of important topics.
To clone the project:

 git clone https://github.com/gitgabrio/container-microservices-tutorial-1.git

In the next parts we will add other containers, to see how to make Eureka clients talking between them and how to deploy them in different docker images. We will also add some microservices communicating with Akka so… stay tuned!!!

Any comment and suggestion will be greatly appreciated!!!

Tutorial on Linux + Asterisk +MySQL + Java – part 5

This part of the tutorial will talk of the Asterisk-Java’ EventListener.

Beside the cool stuff we have previously seen about the Manager API and Live API, Asterisk-java offers some EventListeners to add more dynamic control over Asterisk behaviour. Some of this listeners belongs to the Manager API, and others to the Live API.
To use the Manager API EventListener, you may:

    implement the ManagerEventListener interface
    extend the AbstractManagerEventListener class

We will use the second method in this tutorial. To implement the listener, you have to:

  1. connect to the Asterisk server (as done in the previous parts)
  2. instantiate your class and add it as listener to the connection
  3. override/implement the method required to handle specific events; keep in mind that ManagerEventListener interface define only the
    onManagerEvent(ManagerEvent event)
    method (called when an event is received) while AbstractManagerEventListener already implements this method and defines all the required methods to manage every specific event (but this methods are empty and should be overridded)
  4. In this example, we will do all the connection setup in the constructor. Here’s the full code of the test class. Please note that this is just for demonstration, so forgive the *ugly* endless loop in the main method. Moreover, I have only overridden a couple of methods related to MeetMe join and leave events, but in the org.asteriskjava.manager.event package you may see all of them.

    package net.cardosi.asterisk.ami;
    
    import org.asteriskjava.manager.AbstractManagerEventListener;
    import org.asteriskjava.manager.ManagerConnection;
    import org.asteriskjava.manager.ManagerConnectionFactory;
    import org.asteriskjava.manager.event.MeetMeJoinEvent;
    import org.asteriskjava.manager.event.MeetMeLeaveEvent;
    
    public class AMIListenerTest extends AbstractManagerEventListener {
    
    	public AMIListenerTest() {
    		// Instantiate the factory
    		ManagerConnectionFactory factory = new ManagerConnectionFactory(
    				"localhost", "admin", "secret5");
    		// Retrieve the connection from the factory
    		ManagerConnection managerConnection = factory.createManagerConnection();
    		try {
    			// login to Asterisk
    			managerConnection.login();
    			// Add this object as listener to the connection
    			managerConnection.addEventListener(this);
    		} catch (Exception e) {
    			// Manage exception
    			e.printStackTrace();
    		}
    	}
    
    	@Override
    	protected void handleEvent(MeetMeJoinEvent event) {
    		System.out.println(event);
    	}
    
    	@Override
    	protected void handleEvent(MeetMeLeaveEvent event) {
    		System.out.println(event);
    	}
    
    	public static void main(String[] args) {
    		new AMIListenerTest();
    		while (true) {
    		}
    	}
    }
    

    To use the Live API EventListener, you may:

      implement the AsteriskServerListener interface
      extend the AbstractAsteriskServerListener class

    We will use the second method in this tutorial. The step are pretty similar as in the previous example, but now we will show you how to directly instantiate a DefaultAsteriskServer. With the Live API you can only listen for a small set of Asterisk events, but each of this event will provide a LiveObject to which, in turn, you may add a PropertyChangeListener to monitor what happen to it. The only method required to implement PropertyChangeListener is

    public void propertyChange(PropertyChangeEvent propertyChangeEvent).
    Our example class will both extend AbstractAsteriskServerListener and implement PropertyChangeListener:

    package net.cardosi.asterisk.live;
    
    import java.beans.PropertyChangeEvent;
    import java.beans.PropertyChangeListener;
    import org.asteriskjava.live.AbstractAsteriskServerListener;
    import org.asteriskjava.live.AsteriskChannel;
    import org.asteriskjava.live.AsteriskQueueEntry;
    import org.asteriskjava.live.AsteriskServer;
    import org.asteriskjava.live.DefaultAsteriskServer;
    import org.asteriskjava.live.MeetMeUser;
    import org.asteriskjava.live.internal.AsteriskAgentImpl;
    
    public class LiveListenerTest extends AbstractAsteriskServerListener implements
    		PropertyChangeListener {
    
    	public LiveListenerTest() {
    		AsteriskServer asteriskServer = new DefaultAsteriskServer("localhost",   "admin", "secret5");
    		asteriskServer.addAsteriskServerListener(this);
    	}
    
    	public void onNewAsteriskChannel(AsteriskChannel channelParam) {
    		System.out.println(channelParam);
    		channelParam.addPropertyChangeListener(this);
    	}
    
    	public void onNewMeetMeUser(MeetMeUser userParam) {
    		System.out.println(userParam);
    		userParam.addPropertyChangeListener(this);
    	}
    
    	public void onNewAgent(AsteriskAgentImpl agentParam) {
    		System.out.println(agentParam);
    		agentParam.addPropertyChangeListener(this);
    	}
    
    	public void onNewQueueEntry(AsteriskQueueEntry entryParam) {
    		System.out.println(entryParam);
    		entryParam.addPropertyChangeListener(this);
    	}
    
    	public void propertyChange(PropertyChangeEvent propertyChangeEvent) {
    		System.out.println(propertyChangeEvent);
    	}
    
    	public static void main(String[] args) {
    		new LiveListenerTest();
    		while (true) {
    		}
    	}
    }
    

    “A “PropertyChange” event gets delivered whenever a bean changes a “bound” or “constrained” property. A PropertyChangeEvent object is sent as an argument to the PropertyChangeListener and VetoableChangeListener methods.” (quoted). So, whenever the state of the LiveObject change, you may be informed of what and how changed. If you try the above code, for example connecting a client to a MeetMe conference, you will see all the events and notifications that will be triggered.

    … and that’s all, folks!!!
    I hope you enjoyed this tutorial.
    Bye

Tutorial on Linux + Asterisk +MySQL + Java – part 4

This part of the tutorial keep going with the explanation of the Asterisk-Java’ Live API.

We concluded the previous part of the tutorial talking of Manager API. Now it is time to talk of the Live API.
This API provide an *abstraction* layer over the Manager API, making available *objects* representing Asterisk’ components, instead of directly send commands to the AMI. Some of this *objects* are AsteriskChannel (it represents a channel inside Asterisk), AsteriskServer (the Asterisk server itself) and so on. You may find them inside the org.asteriskjava.live package.
In the following example we tell asterisk to call a (connected) SIP client and, if it answer, redirect it to a specific extension. First of all we have to login to the AMI, as we did in the previous part:

        ManagerConnectionFactory factory = new ManagerConnectionFactory(
				"localhost", "admin", "secret5");
	// Retrieve the connection from the factory
	ManagerConnection managerConnection = factory.createManagerConnection();
	try {
		// login to Asterisk
		managerConnection.login();
         ...

then we instantiate an AsteriskServerImpl that “represent” the Asterisk server:

AsteriskServerImpl asteriskServer = new AsteriskServerImpl(managerConnection);

Now, we want to be sure to call a client actually connected, so:

  1. first, we retrieve the list or configured peers
  2. iterate over them and select the first (if any) is currently connected
  3. if no peer is connected, the method return
  4. otherwise, we retrieve the peer’ channel
  5. and then call it and, eventually, redirect it to a specific context/extension/priority
        // Retrieve the list of configured peers
	List peerEntries = asteriskServer.getPeerEntries();
	PeerEntryEvent peerToCall = null;
        // Iterate over peers
	for (PeerEntryEvent peerEntry : peerEntries) {
		String status = peerEntry.getStatus();
                // Select the first one connected and exit the loop
		if (status != null && status.contains("OK")) {
		         peerToCall = peerEntry;
                         break;
		}
	}
        // If no peer is connected exit from method
	if (peerToCall == null) {
		return;
	}
        // Build the channel name
	String channelName = peerToCall.getChannelType();
	channelName += "/" + peerToCall.getObjectName();
        // Call the client and, eventually, redirect it
	asteriskServer.originateToExtension(channelName, "default", "8600", 1, 10000);

originateToExtension is just one of the methods that AsteriskServerImpl offers. As a side note, it throws an Exception if the “originate” action cannot be sent to the Asterisk server or if the channel name is unavailable. A nice thing is that this method returns an AsteriskChannel that may be used for funny stuff, like for example send a DTMF to the peer:

         AsteriskChannel asteriskChannel = asteriskServer.originateToExtension(channelName, "default", "8600", 1, 10000);
	asteriskChannel.playDtmf("1");

In my dialplan 8600@default is a MeetMe conference – I am using this for the example so to be sure that the channel is still active when we send the DTMF.
As I have already told you at the beginning of the tutorials (or maybe not, I’m getting older), I am not an Asterisk GURU, but here I would just put a quick checklist to verify that you have a correct MeetMe configuration (thanks Jorge for your suggestion):

  1. Meetme depends on dahdi; so dahdi module should be loaded (on linux, check with lsmod | grep dahdi); moreover, dahdi channels must be owned by the user that *actually* run asterisk – to be clear, I start asterisk as root but the asterisk process is owned by “asterisk” user (ps aux | grep asterisk); so, all /dev/dahdi “devices” (and the directory itself) must be owned by “asterisk” user;
  2. inside meetme.conf you just have to put a line like “conf => (number of conference, ex. 1234);
  3. of course, you have to load MeetMe module on Asterisk (you can check it for example from console with “meetme list” command; it actually shows only active conferences, so it is normal you have an empty return);
  4. you have to create a dialplan extensions that point to the meetme conference (ex. exten => 8600,1,Meetme(1234));
  5. verify that everything is working with a sip phone calling the extension (in the example, 8600) – you should be able to join the conference (and now the conference should appear with the “meetme list” command).

And here’s the complete code to use:

package net.cardosi.asterisk.live;

import java.util.List;
import org.asteriskjava.live.AsteriskChannel;
import org.asteriskjava.live.internal.AsteriskServerImpl;
import org.asteriskjava.manager.ManagerConnection;
import org.asteriskjava.manager.ManagerConnectionFactory;
import org.asteriskjava.manager.event.PeerEntryEvent;

public class LiveTest {

	public static void main(String[] args) {
		// Retrieve the factory with connection parameters
		ManagerConnectionFactory factory = new ManagerConnectionFactory(
				"localhost", "admin", "secret5");
		// Retrieve the connection from the factory
		ManagerConnection managerConnection = factory.createManagerConnection();
		try {
			// login to Asterisk
			managerConnection.login();
			// Instantiate a new AsteriskServer
			AsteriskServerImpl asteriskServer = new AsteriskServerImpl(
					managerConnection);
			// Retrieve the list of configured peers
			List peerEntries =                                asteriskServer.getPeerEntries();
			PeerEntryEvent peerToCall = null;
			// Iterate over peers
			for (PeerEntryEvent peerEntry : peerEntries) {
				String status = peerEntry.getStatus();
				// Select the first one connected and exit the loop
				if (status != null && status.contains("OK")) {
					peerToCall = peerEntry;
					break;
				}
			}
			// If no peer is connected exit from method
			if (peerToCall == null) {
				return;
			}
			String channelName = peerToCall.getChannelType();
			channelName += "/" + peerToCall.getObjectName();
			System.out.println("Calling " + channelName);
			AsteriskChannel asteriskChannel = 
                           asteriskServer.originateToExtension(channelName, "default",   "8600", 1, 10000);
			Thread.sleep(10000);
			System.out.println("Playing '1' on " + channelName);
			asteriskChannel.playDtmf("1");
		} catch (Exception e) {
			// Manage exception
			e.printStackTrace();
		}
	}
}

To test the code, you have to verify the settings like in the previous part. Moreover, you have to configure an extension in your dialplan that keep the channel active (MeetMe, ConfBridge, ParkedCall).
And that’s all for now. In the next part we will have some fun with EventListeners. Don’t miss that!!! Bye