Reactive or asynchronous networking code produces maximum throughput using the least amount of resources. If you need to communicate with a web server the play.api.libs.ws.WS
class will let you write a reactive client. This class wraps over the AsyncHttpClient Java library which uses Java NIO under the covers. But if you need to write a custom TCP client that communicates with a server using a proprietary protocol then you are on your own. Fortunately the Akka actor framework can significantly simplify this process. In this article we will develop a simple reactive TCP client using Akka.
If you have a Play project then Akka is already there as a dependency. You don’t have to manually add Akka as a dependency.
The Business Requirements
We will write a client that will connect to a server and send a short request message. It will then receive a short response. The messages are so short that we can expect to read or write them in a single read or write call without overflowing the networking buffer of the OS. We make this assumption to keep our client code as simple as possible. In real life partial reads and writes are definite possibilities. We are not going to address that here.
In this article our client will send a HTTP request to http://www.example.com. We use HTTP so that you can try the code out without having to write your own server. The basic principles will be same even for a proprietary server.
The API will be very similar to the WS
class mentioned above. The client should return a Future
. This is necessary because Play requires an asynchronous controller to return a Future[Result]
.
Getting to Know Akka Actors
We will develop the TCP client code in an actor class. Let’s take a look at some of the unique aspects of an actor.
Actors Do Things
An actor encapsulates actions. It extends the akka.actor.Actor
trait. All the action logic goes inside the receive
method. A basic skeleton actor will be like this:
import akka.actor.Actor
class MyActor(someState: String) extends Actor {
def receive = {
}
}
Actors are Discovered and Not Instantiated
You never instantiate an actor class using new
. You discover an actor instance either in the local process or in another machine in the network. There are several ways to discover an actor using akka.actor.ActorSystem
. Here we will only discuss how to discover an actor in the local process. First we obtain a reference to an ActorSystem
. You can use any name for your system.
val sys = ActorSystem.create("MyActorSystem")
Then you use akka.actor.Props
to define how to locate the actor. We will supply the class name of the actor and any constructor argument.
val p = Props(classOf[MyActor], "Some data")
Finally, we discover the actor like this:
val actor = sys.actorOf(p)
You Interact with an Actor Using Messages
You never ever directly call a method of an actor. You ask an actor to do something by sending it a message. You send a message using the !
operator. Almost any type of object can be sent as a message.
actor ! "Some message" //String message
actor ! 20 //Int message
The actor receives these messages from its receive
method.
class MyActor(someState: String) extends Actor {
def receive = {
case str: String => println(s"String message: $str")
case i: Int => println(s"Int message: $i")
case _ => println("Bizarre message from outer space.")
}
}
OK, that is all you need to know to get started developing a TCP client actor.
Write the TCP Client
Create a class called TcpClient
. Add these import statements.
import java.net.InetSocketAddress
import akka.actor.Actor
import akka.io.{IO, Tcp}
import akka.util.ByteString
import scala.concurrent.Promise
import Tcp._
The client will need these state variables:
- The address of the server to connect to.
- The request message data.
- The
scala.concurrent.Promise
to complete when response comes back. We can obtain a Future
from the promise as we will see later.
So, the class definition will have these three state variables:
class TcpClient(remote: InetSocketAddress,
requestData: String,
thePromise: Promise[String]) extends Actor {
}
As soon as the actor is created we will like to establish connection with the server. This is done like this.
class TcpClient(remote: InetSocketAddress,
requestData: String,
thePromise: Promise[String]) extends Actor {
import context.system
println("Connecting.")
IO(Tcp) ! Connect(remote)
}
Basically IO(Tcp)
returns reference to an actor. We send it a Connect
message to ask it to setup a connection. A reference to our own actor (TcpClient
) is sent along with the Connect
message as an implicit argument.
After the connection is established or an error occurs the TCP actor will send us a message back. All we have to do is intercept these messages from our receive
method.
class TcpClient(remote: InetSocketAddress, requestData: String, thePromise: Promise[String]) extends Actor {
import context.system
println("Connecting.")
IO(Tcp) ! Connect(remote)
def receive = {
case CommandFailed(_: Connect) =>
println ("Connection failed.")
context stop self
case c @ Connected(remote, local) =>
println ("Connect succeeded.")
val connection = sender()
connection ! Register(self)
println("Sending request message.")
connection ! Write(ByteString(requestData))
case _ => println("Something else is up.")
}
}
A Connected
message signifies a successful connection. In that case we send the connection actor a Register
message to set our TcpClient
actor as receiver of various IO events (like socket is readable or writable). Next, we write the request message. This is done by sending a Write
message to the connection. Data is always represented using akka.util.ByteString
.
After this our actor will receive messages every time something interesting happens to the connection. We are specifically interested in these events:
- The attempt to write the request data has failed.
- The socket is readable indicating the server has written some response data.
- The server has closed the connection.
There are many other possible events that can happen. But we will ignore them for now.
To process these messages we use a separate receive method. This is defined using context become
. The full code for the actor will now look like this.
class TcpClient(remote: InetSocketAddress,
requestData: String,
thePromise: Promise[String]) extends Actor {
import context.system
println("Connecting")
IO(Tcp) ! Connect(remote)
def receive = {
case CommandFailed(_: Connect) =>
println ("Connect failed")
context stop self
case c @ Connected(remote, local) =>
println ("Connect succeeded")
val connection = sender()
connection ! Register(self)
println("Sending request early")
connection ! Write(ByteString(requestData))
context become {
case CommandFailed(w: Write) =>
println("Failed to write request.")
case Received(data) =>
println("Received response.")
thePromise.success(
data.decodeString("UTF-8"))
case "close" =>
println("Closing connection")
connection ! Close
case _: ConnectionClosed =>
println("Connection closed by server.")
context stop self
}
case _ => println("Something else is up.")
}
}
Note how we are fulfilling the promise once we receive a response message.
Normally a server closes connection. Here we allow someone to send a “close” String message to our actor to close the connection from the client side.
Use the Actor from a Play Action
I will assume that you know the basics of asynchronous controller development. If not read this official guide on the subject. Basically, the controller method needs to return a Future[play.api.mvc.Result]
.
Add these import statements to your controller.
import java.net.InetSocketAddress
import akka.actor.{Props, ActorSystem}
import play.api.libs.concurrent.Execution.Implicits._
import play.api.mvc._
import scala.concurrent.Promise
Importing play.api.libs.concurrent.Execution.Implicits
is necessary to add an implicit execution environment for the Akka framework.
The skeleton of our controller code will look like this.
class Application extends Controller {
def index = Action.async {
}
}
In our index
method, add these lines to discover the TcpClient actor.
val host = "example.com"
val promise = Promise[String]()
val props = Props(classOf[TcpClient],
new InetSocketAddress(host, 80),
s"GET / HTTP/1.1\r\nHost: ${host}\r\nAccept: */*\r\n\r\n",
promise)
val sys = ActorSystem.create("MyActorSystem")
val tcpActor = sys.actorOf(props)
Next, we obtain the Future
associated with the Promise
by calling promise.future
. Then we convert the Future[String]
into Future[Result]
and return it.
//Convert the promise to Future[Result]
promise.future map { data =>
tcpActor ! "close"
Ok(data)
}
The full code of the index
method will now look like this.
def index = Action.async {
val host = "example.com"
val promise = Promise[String]()
val props = Props(classOf[TcpClient],
new InetSocketAddress(host, 80),
s"GET / HTTP/1.1\r\nHost: ${host}\r\nAccept: */*\r\n\r\n", promise)
val sys = ActorSystem.create("MyActorSystem")
val tcpActor = sys.actorOf(props)
promise.future map { data =>
tcpActor ! "close"
Ok(data)
}
}
Fully Working Code
Get the complete sbt project from GitHub.
About Partial Writes
When you write to a socket your OS will store the data in a buffer and then send data from this buffer over the network interface hardware. The OS has a limited amount of space to store this data. If the written data is large or when there are many writes going on at the same time the OS can easily run out of buffer space. If that happens the OS won’t be able to fully store the written data in the available buffer space. The write call will do a partial write and will let you know how much data was written. The socket becomes unwritable while there is no more buffer space available. It is the responsibility of the program to retry writing the remaining data when the socket becomes writable again.
Akka helps you take care of this problem. When you write a chunk of data to a connection Akka will keep retrying until the entire chunk is fully written. Depending on how much data you tried to write and how busy the machine is it my take Akka several attempts and some time for it to fully write the data. At least from the application’s point of view you don’t have to worry about this. You do a single write call and Akka takes care of sending the data, in multiple attempts if needed. However there is a catch. Akka can only retry writing a single chunk of data. Any attempt to write another chunk of data while a previous chunk is still in the middle of getting fully written will fail. Basically the application code will need to wait for the previously written data to be fully written by Akka before it can write again. To deal with this restriction Akka can let your application know when a previous write has fully completed. This is done using an acknowledgment mechanism. Let’s see this in the event handling code of an actor. In the example below we write the HTTP request header and body in two separate write calls.
class TcpClient(remote: InetSocketAddress,
requestHeader: String,
requestBody: String,
thePromise: Promise[String]) extends Actor {
import context.system
println("Connecting")
IO(Tcp) ! Connect(remote)
case object MyAck extends Event
def receive = {
case CommandFailed(_: Connect) =>
println ("Connect failed")
context stop self
case c @ Connected(remote, local) =>
println ("Connect succeeded")
val connection = sender()
connection ! Register(self)
println("Sending request header.")
connection ! Write(ByteString(requestHeader), MyAck)
context become {
case CommandFailed(w: Write) =>
println("Failed to write request.")
case MyAck =>
println("Writing request body.")
connection ! Write(ByteString(requestBody))
case Received(data) =>
println("Received response.")
thePromise.success(
data.decodeString("UTF-8"))
case "close" =>
println("Closing connection")
connection ! Close
case _: ConnectionClosed =>
println("Connection closed by server.")
context stop self
}
case _ => println("Something else is up.")
}
}
Here, after we write the request header we have to wait for it to be fully written before we can write the body.