SchedulerActor

The relevant parts of the code are displayed and explained. Now let us see how to obtain price data:

def constructUrl(exchange: String): String =
{
"https://min-api.cryptocompare.com/data/histominute?fsym=BTC&tsym=USD&limit=23&aggregate=1&e=" + exchange
}

ConstructUrl returns a completely formed URL for the request to the Cryptocompare API. More details are given in section related to the API:

final val predictionActor = system.actorOf(Props(new PredictionActor(configuration, db)))
final val traderActor = system.actorOf(Props(new TraderActor(ws)))

Creates instances of PredictionActor and TraderActor:

override def receive: Receive = {

The Receive method is defined in the actor trait and has to be implemented. It is triggered when someone passes a message to this actor (Scheduler in our case):

case _ =>
val futureResponse=restClient.getPayload(constructUrl(exchange))

In the preceding code, case _ => means that we react to any message of any type and content. The first thing that is done is an async call to the Cryptocompare API by the URL specified before. This is done with the help of RestClient, which returns Future with the response JSON. After receiving the response (inside futureResponse on complete callback), .json is mapped into the custom case class CryptoCompareResponse:

case class CryptoCompareResponse(Response: String, Type: Int, Aggregated: Boolean, Data: List[OHLC],     FirstValueInArray: Boolean, TimeTo: Long,TimeFrom: Long)

The case class is similar to POJO (Plain Old Java Object) without the need to write constructors and getters/setters:

object CryptoCompareResponse {
implicit val cryptoCompareResponseReads = Json.reads[CryptoCompareResponse]
}

This companion object is required for mapping JSON into this class. The CryptocompareResponse object stores the output of the API—a list of OHLC data, time range of data and others which that are not relevant to us. The OHLC class corresponds to actual price data:

case class OHLC(time: Long, open: Double, 
high: Double,
low: Double,
close: Double,
volumefrom: Double,
volumeto: Double)

After the data is ready, prices are stored in the DB by calling storePriceData(cryptoCompareResponse). At first, it does a batch insert (using Anorm's BatchSQL) into the PRICE_STAGING table and re-inserts into PRICE with deduplication with respect to timestamp, as we are receiving overlapping price data:

val batch = BatchSql(
"""|INSERT INTO PRICE_STAGING(TIMESTAMP,EXCHANGE,PRICE_OPEN,PRICE_CLOSED,VOLUME_BTC,
VOLUME_USD)| VALUES({timestamp}, {exchange}, {priceOpen}, {priceClosed}, {volumeBTC}, {volumeUSD})""".stripMargin,transformedPriceDta.head,transformedPriceDta.tail:_*)
val res: Array[Int] = batch.execute() // array of update count
val reInsert = SQL(
"""
|INSERT INTO PRICE(TIMESTAMP, EXCHANGE, PRICE_OPEN, PRICE_CLOSED, VOLUME_BTC, VOLUME_USD)
|SELECT TIMESTAMP, EXCHANGE, PRICE_OPEN, PRICE_CLOSED, VOLUME_BTC, VOLUME_USD
|FROM PRICE_STAGING AS s
|WHERE NOT EXISTS (
|SELECT *
|FROM PRICE As t
|WHERE t.TIMESTAMP = s.TIMESTAMP
|)
""".stripMargin).execute()
Logger.debug("reinsert " + reInsert)

After storing into the DB, SchedulerActor transforms OHLC data into (timestamp, delta) tuples, where delta is (closePrice-openPrice). So the format is suitable for the ML model. The transformed data is passed as a message to PredictionActor with explicit waiting for a response. This is done by using the ? operator. We ask the prediction actor:

(predictionActor ? CryptoCompareDTOToPredictionModelTransformer.tranform(cryptoCompareResponse)).mapTo[CurrentDataWithShortTermPrediction].map {

Its response is mapped to the CurrentDataWithShortTermPrediction class and passed to TraderActor using the ! operator. Unlike ?, the ! operator does not require a response:

predictedWithCurrent =>
traderActor ! predictedWithCurrent}

This was basic a walkthrough of SchedulerActor. We read data from the Cryptocompare API, store it into the database, send to PredictionActor and wait for its response. Then we forward its response to TraderActor.

Now let's see what happens inside PredictionActor.