Tutorial on Microservices with Kotlin, SpringBoot, Akka and Docker – part 7

Intro

In the last post we implemented the akkaclient service, so to have a simple Akka message exchange. What was missing is the AkkaCluster implementation, the topic of this last part of the tutorial.
My goal overall goal is to integrate as much as possible the “Akka” world with the “Eureka” one, having only one main “configuration” point, represented by the “RegistrationService”: akka services will have to retrieve cluster informations from it.
As always you may download the full code from the repository:

 git clone https://github.com/gitgabrio/container-microservices-tutorial-7.git

akkacluster-service

Let’s add the module declaration to the parent pom:

...
   </parent>
    <modules>
        <module>servicecommon</module>
        <module>registrationservice</module>
        <module>persistenceservice</module>
        <module>timeconsumingservice</module>
        <module>configurationservice</module>
        <module>akkaclusterservice</module>
        <module>actorserverservice</module>
        <module>actorclientservice</module>
        <module>docker</module>
    </modules>
    <properties>
...

This is the akkacluster-service pom:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 <parent>
  <groupId>net.microservices.tutorial</groupId>
  <artifactId>container-microservice</artifactId>
  <version>0.1</version>
  <relativePath>../</relativePath>
 </parent>
 <modelVersion>4.0.0</modelVersion>

 <artifactId>akkaclusterservice</artifactId>
 <properties>
  <akka.version>2.5.3</akka.version>
  <!-- CONFIGURATIONS -->
  <start-class>net.microservices.tutorial.akkaclusterservice.AkkaClusterServer</start-class>
  <finalName>akkaclusterservice-${project.version}</finalName>
  <service.port>7777</service.port>
  <rs.port>1111</rs.port>
  <rs.ip>localhost</rs.ip>
  <akka.port>12550</akka.port>
  <seed.port1>22552</seed.port1>
  <seed.port2>22553</seed.port2>
 </properties>
 <build>
  <finalName>${finalName}</finalName>
 </build>
 <dependencies>
  <dependency>
   <groupId>net.microservices.tutorial</groupId>
   <artifactId>servicecommon</artifactId>
   <version>0.1</version>
  </dependency>
  <dependency>
   <groupId>com.typesafe.akka</groupId>
   <artifactId>akka-actor_2.11</artifactId>
   <version>${akka.version}</version>
  </dependency>
  <dependency>
   <groupId>com.typesafe.akka</groupId>
   <artifactId>akka-remote_2.11</artifactId>
   <version>${akka.version}</version>
  </dependency>
  <dependency>
   <groupId>com.typesafe.akka</groupId>
   <artifactId>akka-cluster_2.11</artifactId>
   <version>${akka.version}</version>
  </dependency>
  <dependency>
   <groupId>com.typesafe.akka</groupId>
   <artifactId>akka-cluster-metrics_2.11</artifactId>
   <version>${akka.version}</version>
  </dependency>
 </dependencies>
</project>

As you may see, it is almost the same as the other akka modules, except for the akka-cluster specific dependencies. Please note that these dependencies have been added also to the actorserverservice and actorclientservice modules.

application-conf


akka {
 actor {
  provider = "cluster"
  serializers {
   proto = "akka.remote.serialization.ProtobufSerializer"
  }
 }
 remote {
  netty.tcp {
   hostname = localhost
   port = 4324
  }
  retry-gate-closed-for = 10 s
 }

 cluster {
  seed-nodes = [
   "akka.tcp://ClusterSystem@127.0.0.1:2551",
   "akka.tcp://ClusterSystem@127.0.0.1:2552"]
 }
 log-sent-messages = on
 log-received-messages = on

# The length of time to gate an address whose name lookup has failed
# or has explicitly signalled that it will not accept connections
# (remote system is shutting down or the requesting system is quarantined).
# No connection attempts will be made to an address while it remains
# gated. Any messages sent to a gated address will be directed to dead
# letters instead. Name lookups are costly, and the time to recovery
# is typically large, therefore this setting should be a value in the
# order of seconds or minutes.
 gate-invalid-addresses-for = 10 s
}

The application-conf has been modified to use “cluster” as provider. This modification has been done also to the other akka modules. Specific for akkacluster is the cluster configuration

...
cluster {
  seed-nodes = [
   "akka.tcp://ClusterSystem@127.0.0.1:2551",
   "akka.tcp://ClusterSystem@127.0.0.1:2552"]
 }
...

where we define the seed-nodes address. Here you may read the general instructions for Cluster configuration.

With the above in files in place, we could already start our bare-bone cluster. With the following code we will add some functionalities to it.

code

We want to show in the home page some informations about the cluster, namely:
1) Registered Nodes status
2) Cluster Metrics (CPU and memory usage)

The controller will take this informations from the autowired service class and will inject them in the template:
net/microservices/tutorial/akkaclusterservice/controllers/HomeController.kt

...
@Controller
class HomeController {

    @Autowired
    var homeService: HomeService? = null

    @RequestMapping("/")
    fun home(model: Model): String {
        val heapMemory = homeService?.heapMemory ?: "UNKNOWN"
        val processors = homeService?.processors ?: "UNKNOWN"
        val loadAverage = homeService?.loadAverage ?: "UNKNOWN"
        val nodes = homeService?.nodeStatusMap ?: mutableMapOf()
        model.addAttribute("heapMemory", heapMemory)
        model.addAttribute("processors", processors)
        model.addAttribute("loadAverage", loadAverage)
        model.addAttribute("nodes", nodes)
        return "index"
    }

}

The “Service” class will be used to store this informations

net/microservices/tutorial/akkaclusterservice/services/HomeService.kt

...
@Service
open class HomeService {

    enum class NODE_STATUS {
        UP,
        UNREACHABLE,
        REMOVED
    }
    var heapMemory: Double = 0.00
    var processors: Int = 0
    var loadAverage: Double = 0.00
    var nodeStatusMap: MutableMap<Address, NODE_STATUS> = mutableMapOf()
}

For the Nodes status we have defined an enum and a Map with the Node‘s address and status. Using the Address as key allow us to have only one entry for Nodes restarted on the same address.
The other variables are used for the metrics.

Now, we have to populate all this values.

First of all we are going to monitor when Nodes are being added/removed. For this, we create an Actor that registers itself to cluster events and reacts to specific messages:

net/microservices/tutorial/akkaclusterservice/actors/ClusterListenerActor.kt

class ClusterListenerActor(val homeService: HomeService) : AbstractActor() {
...

 //subscribe to cluster changes
 override fun preStart() {
  cluster.subscribe(self(), ClusterEvent.initialStateAsEvents(),
   MemberEvent::class.java, UnreachableMember::class.java)
 }

 //re-subscribe when restart
 override fun postStop() {
  cluster.unsubscribe(self())
 }

 override fun createReceive(): AbstractActor.Receive {
        return receiveBuilder()
                .match(MemberUp::class.java) { mUp ->
                    log.info("Member is Up: {}", mUp.member())
                    homeService.nodeStatusMap.put(mUp.member().uniqueAddress().address(), HomeService.NODE_STATUS.UP)
                }
                .match(UnreachableMember::class.java) { mUnreachable ->
                    log.info("Member detected as unreachable: {}", mUnreachable.member())
                    homeService.nodeStatusMap.put(mUnreachable.member().uniqueAddress().address(), HomeService.NODE_STATUS.UNREACHABLE)
                }
                .match(MemberRemoved::class.java) { mRemoved ->
                    log.info("Member is Removed: {}", mRemoved.member())
                    homeService.nodeStatusMap.put(mRemoved.member().uniqueAddress().address(), HomeService.NODE_STATUS.REMOVED)
                }
                .match(MemberEvent::class.java) { message ->
                    log.info("Member is {}: {}", message.member().status(), message.member())
            // ignore
                }
                .build()
    }
}

Whenever a new Node registers itself to the cluster or change its status, the actor updates HomeService’s nodeStatusMap accordingly.

Next, we want to show some metrics for the cluster. For this, we implement another actor that reacts to ClusterMetricsChanged messages:

net/microservices/tutorial/akkaclusterservice/actors/MetricsListenerActor.kt:

 ...
class MetricsListenerActor(val homeService: HomeService) : AbstractActor() {
 ...

  // Subscribe unto ClusterMetricsEvent events.
  override fun preStart() {
   extension.subscribe(self())
  }

  // Unsubscribe from ClusterMetricsEvent events.
  override fun postStop() {
   extension.unsubscribe(self())
  }

  override fun createReceive(): AbstractActor.Receive {
   return receiveBuilder()
    .match(ClusterMetricsChanged::class.java) { clusterMetrics ->
      for (nodeMetrics in clusterMetrics.nodeMetrics) {
       if (nodeMetrics.address() == cluster.selfAddress()) {
        logHeap(nodeMetrics)
        logCpu(nodeMetrics)
       }
      }
    }
    .match(CurrentClusterState::class.java) { message ->
      // Ignore.
    }
    .build()
  }

  internal fun logHeap(nodeMetrics: NodeMetrics) {
   val heap = StandardMetrics.extractHeapMemory(nodeMetrics)
    if (heap != null) {
     val heapMemory = heap.used().toDouble() / 1024.0 / 1024.0
     homeService.heapMemory = heapMemory
   }
  }

  internal fun logCpu(nodeMetrics: NodeMetrics) {
    val cpu = StandardMetrics.extractCpu(nodeMetrics)
     if (cpu != null && cpu.systemLoadAverage().isDefined) {
      homeService.processors = cpu.processors()
      homeService.loadAverage = cpu.systemLoadAverage().get() as Double
  }
 }

This actor will modify the others HomeService’s variables.

All this functionalities are orchestrated inside the configuration:
net/microservices/tutorial/akkaclusterservice/configurations/AkkaClusterConfiguration.kt:

@Configuration
@EnableDiscoveryClient
@ComponentScan("net.microservices.tutorial.akkaclusterservice")
open class AkkaClusterConfiguration {

    private val SYSTEM_NAME = "ClusterSystem"

    @Value("\${eureka.instance.metadata-map.port}")
    var akkaPort: Int = 0

    @Value("\${eureka.instance.metadata-map.seed_port1}")
    var seedPort1: Int = 0

    @Value("\${eureka.instance.metadata-map.seed_port2}")
    var seedPort2: Int = 0

    @Autowired
    private val eurekaClient: EurekaClient? = null


    @Bean
    open fun akkaClusterSystem(homeService: HomeService): ActorSystem {
        startClusterListener(seedPort1, "clusterListenerActor1", homeService)
        startClusterListener(seedPort2, "clusterListenerActor2", homeService)
        val defaultApplication: Config = getConfig(akkaPort)
        val toReturn = ActorSystem.create(SYSTEM_NAME, defaultApplication)
        return toReturn
    }

    @Bean
    open fun metricsListenerActor(akkaClusterSystem: ActorSystem, homeService: HomeService): ActorRef {
        val toReturn = akkaClusterSystem.actorOf(Props.create(MetricsListenerActor::class.java, homeService), "metricsListener")
        return toReturn
    }

    @Bean
    open fun homeController(): HomeController {
        return HomeController()
    }

    private fun startClusterListener(seedPort: Int, actorName: String, homeService: HomeService) {
        val defaultApplication: Config = getConfig(seedPort)
        val system = ActorSystem.create(SYSTEM_NAME, defaultApplication)
        system.actorOf(Props.create(ClusterListenerActor::class.java, homeService), actorName)
    }

    private fun getConfig(tcpPort: Int): Config {
        val hostName = eurekaClient?.applicationInfoManager?.info?.ipAddr ?: "hostname"
        val seedNodes = listOf("akka.tcp://$SYSTEM_NAME@$hostName:$seedPort1", "akka.tcp://$SYSTEM_NAME@$hostName:$seedPort2")
        val toReturn: Config = ConfigFactory.defaultApplication()
                .withValue("akka.remote.netty.tcp.hostname", ConfigValueFactory.fromAnyRef(hostName))
                .withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(tcpPort))
                .withValue("akka.cluster.seed-nodes", ConfigValueFactory.fromIterable(seedNodes))
        return toReturn
    }

}

What’s interesting here is the Cluster startup.
First we instantiate two ClusterListenerActors, one for each seed port

...

    @Bean
    open fun akkaClusterSystem(homeService: HomeService): ActorSystem {
        startClusterListener(seedPort1, "clusterListenerActor1", homeService)
        startClusterListener(seedPort2, "clusterListenerActor2", homeService)
      ...
    }
...

    private fun startClusterListener(seedPort: Int, actorName: String, homeService: HomeService) {
        val defaultApplication: Config = getConfig(seedPort)
        val system = ActorSystem.create(SYSTEM_NAME, defaultApplication)
        system.actorOf(Props.create(ClusterListenerActor::class.java, homeService), actorName)
    }
...

and then we start the cluster with the two seed nodes:

...
    @Bean
    open fun akkaClusterSystem(homeService: HomeService): ActorSystem {
       ...
        val defaultApplication: Config = getConfig(akkaPort)
        val toReturn = ActorSystem.create(SYSTEM_NAME, defaultApplication)
        return toReturn
    }

   ...

    private fun getConfig(tcpPort: Int): Config {
        val hostName = eurekaClient?.applicationInfoManager?.info?.ipAddr ?: "hostname"
        val seedNodes = listOf("akka.tcp://$SYSTEM_NAME@$hostName:$seedPort1", "akka.tcp://$SYSTEM_NAME@$hostName:$seedPort2")
        val toReturn: Config = ConfigFactory.defaultApplication()
                .withValue("akka.remote.netty.tcp.hostname", ConfigValueFactory.fromAnyRef(hostName))
                .withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(tcpPort))
                .withValue("akka.cluster.seed-nodes", ConfigValueFactory.fromIterable(seedNodes))
        return toReturn
    }

Note that we are retrieving the hostName from the eurekaClient, so that other Akka services will be able to retrieve it through the Registration.

To see how, this is how the ActorServerConfiguration has been modified:
net/microservices/tutorial/actorserverservice/configurations/ActorServerConfiguration.kt:

@Configuration
@EnableDiscoveryClient
@ComponentScan("net.microservices.tutorial.actorserverservice")
open class ActorServerConfiguration {

    private val SYSTEM_NAME = "ClusterSystem"

    private var logger = Logger.getLogger(ActorServerConfiguration::class.java.simpleName)

    @Value("\${eureka.instance.metadata-map.port}")
    var akkaPort: Int = 0

    @Value("\${akkaclusterservicename.name}")
    var akkaClusterServiceName: String? = null

    @Autowired
    private val eurekaClient: EurekaClient? = null

    @Bean
    open fun actorSystem(): ActorSystem? {
        val instanceInfo = getRemoteInstanceInfo()
        if (instanceInfo != null) {
            logger.info("instanceInfo ${instanceInfo.hostName} ")
            return getSystem(instanceInfo)
        } else {
            logger.severe("instanceInfo NOT FOUND ")
            return null
        }
    }

    private fun getSystem(instanceInfo: InstanceInfo): ActorSystem {
        val defaultConfiguration = getDefaultConfiguration(instanceInfo)
        val system = ActorSystem.create(SYSTEM_NAME, defaultConfiguration)
        system.actorOf(Props.create(ServerActor::class.java), "serverActor")
        return system
    }

    private fun getRemoteInstanceInfo(): InstanceInfo? {
        logger.info("akkaClusterServiceName $akkaClusterServiceName")
        val akkaClusterService: Application? = eurekaClient?.getApplication(akkaClusterServiceName)
        logger.info("akkaClusterService ? " + akkaClusterService!!.name)
        akkaClusterService?.shuffleAndStoreInstances(true)
        val instances: List<InstanceInfo>? = akkaClusterService?.instances
        var toReturn: InstanceInfo? = null
        if (instances != null && instances.size > 0) {
            toReturn = instances[0]
        }
        return toReturn
    }

    private fun getDefaultConfiguration(instanceInfo: InstanceInfo): Config {
        val hostName = eurekaClient?.applicationInfoManager?.info?.ipAddr ?: "hostname"
        logger.info("hostName $hostName ")
        val clusterServiceHost = instanceInfo.ipAddr
        logger.info("clusterServiceHost $clusterServiceHost")
        val seedNodes = instanceInfo.metadata.filter { kv -> kv.key.startsWith("seed") }.map { kv -> "akka.tcp://$SYSTEM_NAME@$clusterServiceHost:${kv.value}" }
        seedNodes.forEach {
            seedNode -> logger.info("seedNode $seedNode")
        }
        var toReturn: Config = ConfigFactory.defaultApplication()
                .withValue("akka.remote.netty.tcp.hostname", ConfigValueFactory.fromAnyRef(hostName))
                .withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(akkaPort))
        if (!seedNodes.isEmpty()) {
            toReturn = toReturn
                    .withValue("akka.cluster.seed-nodes", ConfigValueFactory.fromIterable(seedNodes))
        }
        return toReturn
    }
}

First, we find the InstanceInfo of the ClusterService:

...

    @Bean
    open fun actorSystem(): ActorSystem? {
        val instanceInfo = getRemoteInstanceInfo()
        ...
    }

    ...

    private fun getRemoteInstanceInfo(): InstanceInfo? {
        logger.info("akkaClusterServiceName $akkaClusterServiceName")
        val akkaClusterService: Application? = eurekaClient?.getApplication(akkaClusterServiceName)
        logger.info("akkaClusterService ? " + akkaClusterService!!.name)
        akkaClusterService?.shuffleAndStoreInstances(true)
        val instances: List<InstanceInfo>? = akkaClusterService?.instances
        var toReturn: InstanceInfo? = null
        if (instances != null && instances.size > 0) {
            toReturn = instances[0]
        }
        return toReturn
    }
...

With this we can then retrieve the address of the remote cluster:

...

    @Bean
    open fun actorSystem(): ActorSystem? {
       ...
            logger.info("instanceInfo ${instanceInfo.hostName} ")
            return getSystem(instanceInfo)
        ...
    }

    private fun getSystem(instanceInfo: InstanceInfo): ActorSystem {
        val defaultConfiguration = getDefaultConfiguration(instanceInfo)
        val system = ActorSystem.create(SYSTEM_NAME, defaultConfiguration)
        system.actorOf(Props.create(ServerActor::class.java), "serverActor")
        return system
    }

   ...

    private fun getDefaultConfiguration(instanceInfo: InstanceInfo): Config {
        val hostName = eurekaClient?.applicationInfoManager?.info?.ipAddr ?: "hostname"
        logger.info("hostName $hostName ")
        val clusterServiceHost = instanceInfo.ipAddr
        logger.info("clusterServiceHost $clusterServiceHost")
        val seedNodes = instanceInfo.metadata.filter { kv -> kv.key.startsWith("seed") }.map { kv -> "akka.tcp://$SYSTEM_NAME@$clusterServiceHost:${kv.value}" }
        seedNodes.forEach {
            seedNode -> logger.info("seedNode $seedNode")
        }
        var toReturn: Config = ConfigFactory.defaultApplication()
                .withValue("akka.remote.netty.tcp.hostname", ConfigValueFactory.fromAnyRef(hostName))
                .withValue("akka.remote.netty.tcp.port", ConfigValueFactory.fromAnyRef(akkaPort))
        if (!seedNodes.isEmpty()) {
            toReturn = toReturn
                    .withValue("akka.cluster.seed-nodes", ConfigValueFactory.fromIterable(seedNodes))
        }
        return toReturn
    }
}

The parameter instanceInfo passed to getDefaultConfiguration is actually the InstanceInfo of the ClusterService, from which we retrieve the ipAddress and the seedNodes ports.

When the ServerActor starts, it registers itself to the cluster, and when stops, it unregisters:
net/microservices/tutorial/actorserverservice/actors/ServerActor.kt:

...
//subscribe to MemberUp events
    override fun preStart() {
        cluster.subscribe(self(), ClusterEvent.MemberUp::class.java)
    }

    //re-subscribe when restart
    override fun postStop() {
        cluster.unsubscribe(self())
    }
...

Moreover, it listen for ClusterEvent, so that whenever an Akka Member with the Client role join the cluster, the ServerActor send to it a ServerActorRegistration message:

...
 override fun createReceive(): Receive {
        return ReceiveBuilder()
                ...
                .match(ClusterEvent.CurrentClusterState::class.java) {
                    state ->
                    state.members
                            .filter {
                                member ->
                                member.status().equals(MemberStatus.up())
                            }
                            .forEach {
                                member ->
                                register(member)
                            }
                }
                .match(ClusterEvent.MemberUp::class.java) {
                    mUp -> register(mUp.member())
                }
                .build()
    }

    ...

    private fun register(member: Member) {
        if (member.hasRole("Client")) {
            context.actorSelection("${member.address()}/user/clientActor")
                    .tell(ServerActorRegistration(), self())
        }
    }
...

In this way, the ClientActor will have a reference of the ServerActor to talk with.

The ActorClientService configuration is almost the same as of the ActoreServerService‘s one.
The ClientActor is simpler then before, though, since it does not have to retrieve the information about the ServerActor.
When it is in inactive state, it just listen and wait for an ActorServer to appear:
net/microservices/tutorial/actorclientservice/actors/ClientActor.kt:

...
  internal var inactive: Receive = receiveBuilder()
            .match(ServerActorRegistration::class.java) {
                remoteActor = sender
                context.watch(remoteActor)
                context.become(active, true)
...

When an ActorServer joins the cluster, the client switch to the active state, and begin exchanging messages with it:

 internal var active: Receive = receiveBuilder()
            .match(AkkaMessage::class.java) { s ->
                logger.info("Received " + s)
                remoteActor?.tell(s, self)
                pendingMessages.put(s.id, s)
            }
            .match(AkkaResponse::class.java) { s ->
                logger.info("Received " + s)
                pendingMessages.remove(s.id)
                logger.info("We still have ${pendingMessages.size} pending messages")
            }
            ...

image creation

Let’s update the docker configuration.
docker/pom.xml:

...
   <properties>
        <!-- PLUGINS -->
        <docker.maven.plugin.fabric8.version>0.21.0</docker.maven.plugin.fabric8.version>
        <!-- CONFIGURATIONS -->
        <docker.repo>(YOUR-REPO-NAME)</docker.repo>
        <!-- services properties -->
        <registration.service.port>1111</registration.service.port>
        <persistence.service.port>2222</persistence.service.port>
        <timeconsuming.service.port>3333</timeconsuming.service.port>
        <configuration.service.port>4444</configuration.service.port>
        <actorserver.service.port>5555</actorserver.service.port>
        <actorclient.service.port>6666</actorclient.service.port>
        <akkacluster.service.port>7777</akkacluster.service.port>
        <!-- Images to build and run - default to all-->
        <images>registrationservice, persistenceservice, timeconsumingservice, configurationservice,
            actorserverservice, actorclientservice, akkaclusterservice
        </images>
    </properties>

    <profiles>
        <profile>
            <id>configuration</id>
            <properties>
                <images>registrationservice, persistenceservice, timeconsumingservice, configurationservice
                </images>
            </properties>
        </profile>
        <profile>
            <id>akka</id>
            <properties>
                <images>registrationservice, akkaclusterservice, actorclientservice, actorserverservice
                </images>
            </properties>
        </profile>
    </profiles>

    <dependencies>
        ...
        <dependency>
            <groupId>net.microservices.tutorial</groupId>
            <artifactId>akkaclusterservice</artifactId>
            <version>0.1</version>
        </dependency>
    </dependencies>
...

You will find the AkkaClusterService image in the sources.

After installing the akkaclusterservice artifact and starting docker with the akka profile,
we should have all the akka services registered:

and the akkacluster service running here
.

Last, in the ActorServerService service a new rest url has been added to show the currently registered users:

You may find it here

Conclusion

You will find the image definition (and all the other missing stuff) in the project:

git clone https://github.com/gitgabrio/container-microservices-tutorial-7.git

Final Thoughts

Well, it has been an interesting tutorial, at least for me.
I hope you liked reading it as much as I liked writing it.
I would like to thanks those friends that helped me: I owe you a beer!!!
And, as always…

any comment and suggestion will be greatly appreciated!!!

Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s