16 min read

Today, you will learn how to collect Bitcoin historical and live-price data. You will also learn to transform data into time series and train your model to make insightful predictions.

Historical and live-price data collection

We will be using the Bitcoin historical price data from Kaggle. For the real-time data, Cryptocompare API will be used.

Historical data collection

For training the ML algorithm, there is a Bitcoin  Historical  Price Data dataset available to the public on Kaggle (version 10). The dataset can be downloaded here. It has 1 minute OHLC data for BTC-USD pairs from several exchanges.

At the beginning of the project, for most of them, data was available from January 1, 2012 to May 31, 2017; but for the Bitstamp exchange, it’s available until October 20, 2017 (as well as for Coinbase, but that dataset became available later):

Bitcoin historical dataset on Kaggle


Figure 1: The Bitcoin  historical dataset on Kaggle

Note that you need to be a registered user and be logged in in order to download the file. The file that we are using is bitstampUSD_1-min_data_2012-01-01_to_2017-10-20.csv. Now, let us get the data we have. It has eight columns:

  • Timestamp: The time elapsed in seconds since January 1, 1970. It is 1,325,317,920 for the first row and 1,325,317,920 for the second 1. (Sanity check! The difference is 60 seconds).
  • Open: The price at the opening of the time interval. It is 4.39 dollars. Therefore it is the price of the first trade that happened after Timestamp (1,325,317,920 in the first row’s case).
  • Close: The price at the closing of the time interval.
  • High: The highest price from all orders executed during the interval.
  • Low: The same as High but it is the lowest price.
  • Volume_(BTC): The sum of all Bitcoins that were transferred during the time interval. So, take all transactions that happened during the selected interval and sum up the BTC values of each of them.
  • Volume_(Currency): The sum of all dollars transferred.
  • Weighted_Price: This is derived from the volumes of BTC and USD. By dividing all dollars traded by all bitcoins, we can get the weighted average price of BTC during this minute. So Weighted_Price=Volume_(Currency)/Volume_(BTC).

One of the most important parts of the data-science pipeline after data collection (which is in a sense outsourced; we use data collected by others) is data preprocessing—clearing a dataset and transforming it to suit our needs.

Transformation of historical data into a time series

Stemming from our goal—predict the direction of price change—we might ask ourselves, does having an actual price in dollars help to achieve this? Historically, the price of Bitcoin was usually rising, so if we try to fit a linear regression, it will show further exponential growth (whether in the long run this will be true is yet to be seen).

Assumptions and design choices

One of the assumptions of this project is as follows: whether we are thinking about Bitcoin trading in November 2016 with a price of about $700, or trading in November 2017 with a price in the $6500-7000 range, patterns in how people trade are similar. Now, we have several other assumptions, as described in the following points:

  • Assumption one: From what has been said previously, we can ignore the actual price and rather look at its change. As a measure of this, we can take the delta between opening and closing prices. If it is positive, it means the price grew during that minute; the price went down if it is negative and stayed the same if delta = 0.
    In the following figure, we can see that Delta was -1.25 for the first minute observed, -12.83 for the second one, and -0.23 for the third one. Sometimes, the open price can differ significantly from the close price of the previous minute (although Delta is negative during all three of the observed minutes, for the third minute the shown price was actually higher than close for a second). But such things are not very common, and usually the open price doesn’t change significantly compared to the close price of the previous minute.
  • Assumption two: The next need to consider…  is predicting the price change in a black box environment. We do not use other sources of knowledge such as news, Twitter feeds, and others to predict how the market would react to them. This is a more advanced topic. The only data we use is price and volume. For simplicity of the prototype, we can focus on price only and construct time series data.
    Time series prediction is a prediction of a parameter based on the values of this parameter in the past. One of the most common examples is temperature prediction. Although there are many supercomputers using satellite and sensor data to predict the weather, a simple time series analysis can lead to some valuable results. We predict the price at T+60 seconds, for instance, based on the price at T, T-60s, T-120s and so on.
  • Assumption three: Not all data in the dataset is valuable. The first 600,000 records are not informative, as price changes are rare and trading volumes are small. This can affect the model we are training and thus make end results worse. That is why the first 600,000 of rows are eliminated from the dataset.
  • Assumption four: We need to Label  our data so that we can use a supervised ML algorithm. This is the easiest measure, without concerns about transaction fees.

Data preprocessing

Taking into account the goals of data preparation, Scala was chosen as an easy and interactive way to manipulate data:

val priceDataFileName:  String  =  "bitstampUSD_1- min_data_2012-01-01_to_2017-10-20.csv"

val spark  =  SparkSession
  .builder()
  .master("local[*]")
  .config("spark.sql.warehouse.dir",  "E:/Exp/")
  .appName("Bitcoin  Preprocessing")
  .getOrCreate()

val data  =  spark.read.format("com.databricks.spark.csv").option("header", "true").load(priceDataFileName)
data.show(10)
>>>

Bitcoin historical price dataset

println((data.count(),  data.columns.size))

>>>

(3045857,  8)

In the preceding code, we load data from the file downloaded from Kaggle and look at what is inside. There are 3045857 rows in the dataset and 8 columns, described before. Then we create the Delta column, containing the difference between closing and opening prices (that is, to consider only that data where meaningful trading has started to occur):

val dataWithDelta  = data.withColumn("Delta",  data("Close") -

data("Open"))

The following code labels our data by assigning 1 to the rows the Delta value of which was positive; it assigns 0 otherwise:

import org.apache.spark.sql.functions._
 import spark.sqlContext.implicits._

 val dataWithLabels  = dataWithDelta.withColumn("label",  

 when($"Close"  -
 $"Open"  > 0, 1).otherwise(0))
 rollingWindow(dataWithLabels,  22, outputDataFilePath,

 outputLabelFilePath)

This code transforms the original dataset into time series data. It takes the Delta values of WINDOW_SIZE rows (22 in this experiment) and makes a new row out of them. In this way, the first row has Delta values from t0 to t21, and the second one has values from t1 to t22. Then we create the corresponding array with labels (1 or 0).

Finally, we save X and Y into files where 612000 rows were cut off from the original dataset; 22 means rolling window size and 2 classes represents that labels are binary 0 and 1:

val dropFirstCount: Int = 612000
def rollingWindow(data: DataFrame, window: Int, xFilename: String,
yFilename: String): Unit = {
  var i = 0
  val xWriter = new BufferedWriter(new FileWriter(new File(xFilename)))
  val yWriter = new BufferedWriter(new FileWriter(new File(yFilename)))
  val zippedData = data.rdd.zipWithIndex().collect()
  System.gc()
  val dataStratified = zippedData.drop(dropFirstCount)//slice 612K
  while (i  r._1.getAs[Double]("Delta")).toList
  val y = dataStratified.apply(i + window)._1.getAs[Integer]("label")
  val stringToWrite = x.mkString(",")
  xWriter.write(stringToWrite + "n")
  yWriter.write(y + "n")
  i += 1
  if (i  %  10  ==  0)  { 
    xWriter.flush()
    yWriter.flush()
  }
} 
xWriter.close()
 yWriter.close()
}

In the preceding code segment:

val outputDataFilePath:  String = "output/scala_test_x.csv"
val outputLabelFilePath:  String = "output/scala_test_y.csv"

Real-time data through the Cryptocompare API

For real-time data, the Cryptocompare API is used, more specifically HistoMinute, which gives us access to OHLC data for the past seven days at most. The details of the API will be discussed in a section devoted to implementation, but the API response is very similar to our historical dataset, and this data is retrieved using a regular HTTP request. For example, a simple JSON response has the following structure:

{
 "Response":"Success", "Type":100, "Aggregated":false, "Data":
 [{"time":1510774800,"close":7205,"high":7205,"low":7192.67,"open":7198,
 "volumefrom":81.73,"volumeto":588726.94},
 {"time":1510774860,"close":7209.05,"high":7219.91,"low":7205,"open":7205, "volumefrom":16.39,"volumeto":118136.61},
   ... (other  price data)
   ], "TimeTo":1510776180,

   "TimeFrom":1510774800,

   "FirstValueInArray":true,
   "ConversionType":{"type":"force_direct","conversionSymbol":""}
}

Through Cryptocompare HistoMinute, we can get open, high, low, close, volumefrom, and volumeto from each minute of historical data. This data is stored for 7 days only; if you need more, use the hourly or daily path. It uses BTC conversion if data is not available because the coin is not being traded in the specified currency:

volume values through Cryptocompare HistoMinute

Now, the following method fetches the correctly formed URL of the Cryptocompare API, which is a fully formed URL with all parameters, such as currency, limit, and aggregation specified. It finally returns the future that will have a response body parsed into the data model, with the price list to be processed at an upper level:

import javax.inject.Inject
import play.api.libs.json.{JsResult,  Json}
import scala.concurrent.Future
import play.api.mvc._
import play.api.libs.ws._
import processing.model.CryptoCompareResponse
class RestClient  @Inject()  (ws:  WSClient)  {
  def getPayload(url  :  String):    
  Future[JsResult[CryptoCompareResponse]]  =
  {
    val  request:  WSRequest  =  ws.url(url)
    val  future  =  request.get()
    implicit  val  context  =
    play.api.libs.concurrent.Execution.Implicits.defaultContext 
    future.map  {
      response  =>  response.json.validate[CryptoCompareResponse]
    }
  }
}

In the preceding code segment, the CryptoCompareResponse class is the model of API, which takes the following parameters:

  • Response
  • Type
  • Aggregated\
  • Data
  • FirstValueInArray
  • TimeTo
  • TimeFrom

Now, it has the following signature:

case class CryptoCompareResponse(Response  : String, Type : Int,
 Aggregated  : Boolean, Data  :

 List[OHLC], FirstValueInArray  :

 Boolean, TimeTo  : Long,
 TimeFrom:  Long)

object CryptoCompareResponse  {
 implicit  val cryptoCompareResponseReads  = Json.reads[CryptoCompareResponse]
}

Again, the preceding two code segments the open-high-low-close (also known as OHLC), are a model class for mapping with CryptoAPI response data array internals. It takes these parameters:

  • Time: Timestamp in seconds, 1508818680, for instance.
  • Open: Open price at a given minute interval.
  • High: Highest price.
  • Low: Lowest price.
  • Close: Price at the closing of the interval.
  • Volumefrom: Trading volume in the from currency. It’s BTC in our case.
  • Volumeto: The trading volume in the to currency, USD in our case.
  • Dividing Volumeto by Volumefrom gives us the weighted price of BTC.

Now, it has the following signature:

case class OHLC(time:  Long,

 open:  Double,
 high:  Double,

 low:  Double,

 close:  Double,
 volumefrom:  Double,

 volumeto:  Double)

 object OHLC  {
   implicit  val implicitOHLCReads  = Json.reads[OHLC]
 }

Model training for prediction

Inside the project, in the package folder prediction.training, there is a Scala object called TrainGBT.scala. Before launching, you have to specify/change four things:

  • In the code, you need to set up spark.sql.warehouse.dir in some actual place on your computer that has several gigabytes of free space: set(“ spark.sql.warehouse.dir”,  “/home/user/spark”)
  • The RootDir is the main folder, where all files and train models will be
    stored:rootDir  = “/home/user/projects/btc-prediction/”
  • Make sure that the x filename matches the one produced by the Scala script in the preceding step: x  = spark.read.format(“com.databricks.spark.csv”).schema(xSchema).load(rootDir  + “scala_test_x.csv”)
  • Make sure that the y filename matches the one produced by Scala script: y_tmp=spark.read.format(“com.databricks.spark.csv”).schema
    (ySchema).load(rootDir + “scala_test_y.csv”)

The code for training uses the Apache Spark ML library (and libraries required for it) to train the classifier, which means they have to be present in your class path to be able to run it. The easiest way to do that (since the whole project uses SBT) is to run it from the project root folder by typing sbtrun-main  prediction.training.TrainGBT, which will resolve all dependencies and launch training.

Depending on the number of iterations and depth, it can take several hours to train the model. Now let us see how training is performed on the example of the gradient-boosted trees model. First, we need to create a SparkSession object:

val xSchema = StructType(Array(
  StructField("t0", DoubleType, true),
  StructField("t1", DoubleType, true),
  StructField("t2", DoubleType, true),
  StructField("t3", DoubleType, true),
  StructField("t4", DoubleType, true),
  StructField("t5", DoubleType, true),
  StructField("t6", DoubleType, true),
  StructField("t7", DoubleType, true),
  StructField("t8", DoubleType, true),
  StructField("t9", DoubleType, true),
  StructField("t10", DoubleType, true),
  StructField("t11", DoubleType, true),
  StructField("t12", DoubleType, true),
  StructField("t13", DoubleType, true),
  StructField("t14", DoubleType, true),
  StructField("t15", DoubleType, true),
  StructField("t16", DoubleType, true),
  StructField("t17", DoubleType, true),
  StructField("t18", DoubleType, true),
  StructField("t19", DoubleType, true),
  StructField("t20", DoubleType, true),
  StructField("t21", DoubleType, true))
)

Then we read the files we defined for the schema. It was more convenient to generate two separate files in Scala for data and labels, so here we have to join them into a single DataFrame:

Then we read the files we defined for the schema. It was more convenient to generate two separate files in Scala for data and labels, so here we have to join them into a single DataFrame:

import spark.implicits._
val y = y_tmp.withColumn("y", 'y.cast(IntegerType))
import org.apache.spark.sql.functions._
val x_id = x.withColumn("id", monotonically_increasing_id())
val y_id = y.withColumn("id", monotonically_increasing_id())
val data = x_id.join(y_id, "id")

The next step is required by Spark—we need to vectorize the features:

val featureAssembler = new VectorAssembler()

.setInputCols(Array("t0", "t1", "t2", "t3",

"t4", "t5", "t6", "t7",

"t8", "t9", "t10", "t11",

"t12", "t13", "t14", "t15",

"t16", "t17", "t18", "t19",

"t20", "t21"))

.setOutputCol("features")

We split the data into train and test sets randomly in the proportion of 75% to 25%. We set the seed so that the splits would be equal among all times we run the training:

val Array(trainingData,testData) = dataWithLabels.randomSplit(Array(0.75, 0.25), 123)

We then define the model. It tells which columns are features and which are labels. It also sets parameters:

val gbt = new GBTClassifier()
.setLabelCol("label")
.setFeaturesCol("features")
.setMaxIter(10)
.setSeed(123)

Create a pipeline of steps—vector assembling of features and running GBT:

val pipeline = new Pipeline()

.setStages(Array(featureAssembler, gbt))

Defining evaluator function—how the model knows whether it is doing well or not. As we have only two classes that are imbalanced, accuracy is a bad measurement; area under the ROC curve is better:

val rocEvaluator = new BinaryClassificationEvaluator()

.setLabelCol("label")

.setRawPredictionCol("rawPrediction")

.setMetricName("areaUnderROC")

K-fold cross-validation is used to avoid overfitting; it takes out one-fifth of the data at each iteration, trains the model on the rest, and then tests on this one-fifth:

val cv = new CrossValidator()

.setEstimator(pipeline)

.setEvaluator(rocEvaluator)

.setEstimatorParamMaps(paramGrid)

.setNumFolds(numFolds)

.setSeed(123)

val cvModel = cv.fit(trainingData)

After we get the trained model (which can take an hour or more depending on the number of iterations and parameters we want to iterate on, specified in paramGrid), we then compute the predictions on the test data:

val predictions = cvModel.transform(testData)

In addition, evaluate quality of predictions:

val roc = rocEvaluator.evaluate(predictions)

The trained model is saved for later usage by the prediction service:

val gbtModel = cvModel.bestModel.asInstanceOf[PipelineModel]

gbtModel.save(rootDir + "__cv__gbt_22_binary_classes_" + System.nanoTime()/ 1000000 + ".model")

In summary, the code for model training is given as follows:

import org.apache.spark.{ SparkConf, SparkContext }
import org.apache.spark.ml.{ Pipeline, PipelineModel }
import org.apache.spark.ml.classification.{ GBTClassificationModel,
GBTClassifier, RandomForestClassificationModel, RandomForestClassifier}
import org.apache.spark.ml.evaluation.{BinaryClassificationEvaluator,
MulticlassClassificationEvaluator}
import org.apache.spark.ml.feature.{IndexToString, StringIndexer,
VectorAssembler, VectorIndexer}

import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
import org.apache.spark.sql.types.{DoubleType, IntegerType, StructField,
StructType}
import org.apache.spark.sql.SparkSession
object TrainGradientBoostedTree {
def main(args: Array[String]): Unit = {
val maxBins = Seq(5, 7, 9)
val numFolds = 10
val maxIter: Seq[Int] = Seq(10)
val maxDepth: Seq[Int] = Seq(20)
val rootDir = "output/"
val spark = SparkSession
.builder()
.master("local[*]")
.config("spark.sql.warehouse.dir", ""/home/user/spark/")
.appName("Bitcoin Preprocessing")
.getOrCreate()
val xSchema = StructType(Array(
StructField("t0", DoubleType, true),
StructField("t1", DoubleType, true),
StructField("t2", DoubleType, true),
StructField("t3", DoubleType, true),
StructField("t4", DoubleType, true),
StructField("t5", DoubleType, true),
StructField("t6", DoubleType, true),
StructField("t7", DoubleType, true),
StructField("t8", DoubleType, true),
StructField("t9", DoubleType, true),
StructField("t10", DoubleType, true),
StructField("t11", DoubleType, true),
StructField("t12", DoubleType, true),
StructField("t13", DoubleType, true),
StructField("t14", DoubleType, true),
StructField("t15", DoubleType, true),
StructField("t16", DoubleType, true),
StructField("t17", DoubleType, true),
StructField("t18", DoubleType, true),
StructField("t19", DoubleType, true),
StructField("t20", DoubleType, true),
StructField("t21", DoubleType, true)))
val ySchema = StructType(Array(StructField("y", DoubleType,
true)))
val x = spark.read.format("csv").schema(xSchema).load(rootDir +
"scala_test_x.csv")
val y_tmp =
spark.read.format("csv").schema(ySchema).load(rootDir +
"scala_test_y.csv")
import spark.implicits._
val y = y_tmp.withColumn("y", 'y.cast(IntegerType))
import org.apache.spark.sql.functions._
//joining 2 separate datasets in single Spark dataframe
val x_id = x.withColumn("id", monotonically_increasing_id())
val y_id = y.withColumn("id", monotonically_increasing_id())
val data = x_id.join(y_id, "id")
val featureAssembler = new VectorAssembler()
.setInputCols(Array("t0", "t1", "t2", "t3", "t4", "t5",
"t6", "t7", "t8", "t9", "t10", "t11",
"t12", "t13", "t14", "t15", "t16",
"t17","t18", "t19", "t20", "t21"))
.setOutputCol("features")
val encodeLabel = udf[Double, String] { case "1" => 1.0 case
"0" => 0.0 }
val dataWithLabels = data.withColumn("label",
encodeLabel(data("y")))
//123 is seed number to get same datasplit so we can tune
params
val Array(trainingData, testData) =
dataWithLabels.randomSplit(Array(0.75, 0.25), 123)
val gbt = new GBTClassifier()
.setLabelCol("label")
.setFeaturesCol("features")
.setMaxIter(10)
.setSeed(123)
val pipeline = new Pipeline()
.setStages(Array(featureAssembler, gbt))
// ***********************************************************
println("Preparing K-fold Cross Validation and Grid Search")
// ***********************************************************
val paramGrid = new ParamGridBuilder()
.addGrid(gbt.maxIter, maxIter)
.addGrid(gbt.maxDepth, maxDepth)
.addGrid(gbt.maxBins, maxBins)
.build()
val cv = new CrossValidator()
.setEstimator(pipeline)
.setEvaluator(new BinaryClassificationEvaluator())
.setEstimatorParamMaps(paramGrid)
.setNumFolds(numFolds)
.setSeed(123)
// ************************************************************
println("Training model with GradientBoostedTrees algorithm")
// ************************************************************
// Train model. This also runs the indexers.
val cvModel = cv.fit(trainingData)
cvModel.save(rootDir + "cvGBT_22_binary_classes_" +
System.nanoTime() / 1000000 + ".model")
println("Evaluating model on train and test data and
calculating RMSE")
//
**********************************************************************
// Make a sample prediction
val predictions = cvModel.transform(testData)
// Select (prediction, true label) and compute test error.
val rocEvaluator = new BinaryClassificationEvaluator()
.setLabelCol("label")
.setRawPredictionCol("rawPrediction")
.setMetricName("areaUnderROC")
val roc = rocEvaluator.evaluate(predictions)
val prEvaluator = new BinaryClassificationEvaluator()
.setLabelCol("label")
.setRawPredictionCol("rawPrediction")
.setMetricName("areaUnderPR")
val pr = prEvaluator.evaluate(predictions)
val gbtModel = cvModel.bestModel.asInstanceOf[PipelineModel]
gbtModel.save(rootDir + "__cv__gbt_22_binary_classes_" +
System.nanoTime()/1000000 +".model")
println("Area under ROC curve = " + roc)
println("Area under PR curve= " + pr)
println(predictions.select().show(1))
spark.stop()
}
}

Now let us see how the training went:

>>>
Area under ROC curve = 0.6045355104779828
Area under PR curve= 0.3823834607704922

Therefore, we have not received very high accuracy, as the ROC is only 60.50% out of the best GBT model. Nevertheless, if we tune the hyperparameters, we will get better accuracy.

We learned how a complete ML pipeline can be implemented, from collecting historical data, to transforming it into a format suitable for testing hypotheses. We also performed machine learning model training to carry out predictions.

You read an excerpt from a book written by Md. Rezaul Karim, titled Scala Machine Learning Projects. In this book, you will learn to build powerful machine learning applications for performing advanced numerical computing and functional programming.

Scala Machine learning projects

 

LEAVE A REPLY

Please enter your comment!
Please enter your name here