Ready to step into the era of real-time Big Data? In this tutorial series, we will cover the storage and processing part by handling a stream with two frameworks suitable for streaming: Couchbase for storage and Spark for processing.
COLLECT: We will simulate a stream that will be processed in real-time.
STORE: The data will be persistently inserted into Couchbase, our distributed storage base.
PROCESS: The Spark processing layer will detect mutations within the storage base, process the data, and then return the results on the fly by injecting them into Couchbase.
VISUALIZE: The results can be visualized in pseudo real-time, which is beyond the scope of this tutorial.
Why Couchbase?
It is a distributed storage base focused on a unique and memory-centric architecture, scalable, high throughput at the stream level, and, above all, based on NoSQL: all these criteria combined are ideal for real-time storage. Compared to a traditional relational database, NoSQL is much more scalable and delivers superior performance. Additionally, if you need to analyze a large volume of data with a changing structure, Couchbase is your ally.
Why Spark?
Spark, with its well-established reputation, enables ultra-fast processing speeds (Map-reduce) and combines SQL, Streaming, and Machine Learning analyses to process data. With its "ML" library, it is possible to train models. The "Spark streaming" library combined with the Couchbase - Spark connector allows retrieving new data from Couchbase in real-time and confronting it with the Spark model. Similarly to data retrieval, results can be inserted into Couchbase from Spark.
Let's go! It's time to create our Real-Time Big Data architecture!
Creating the Couchbase Cluster
Main goal of the tutorial:
- Deployment of a Couchbase cluster ~ 15 minutes
What you need:
-
X Ubuntu 14.04 LTS machines (as of now, Couchbase is not compatible with Ubuntu 16.04). The more data, the greater the need for significant storage capacity. You can use as many nodes as you want.
Deploying the Couchbase Cluster:
Before getting practical, it is worth recalling the main challenge of this part: we want to set up a Couchbase cluster that will constitute our distributed storage base, composed of several machines, called nodes. Our cluster must be scalable and support incoming data flow. As a reminder, Couchbase Server is a NoSQL database based on a distributed architecture that reconciles performance, scalability, and availability. Among other things, it allows developers to easily and quickly build distributed storage bases using the power of SQL and JSON flexibility. A Couchbase cluster has no master and slave nodes; all nodes are equal, and data is automatically replicated to avoid a single point of failure. Thus, if a node is no longer operational, the data will still be accessible! We use Couchbase in this tutorial because it is suitable for real-time storage.
We will use 3 machines on the same subnet on which we will deploy our Couchbase cluster.
The IP addresses and names of your machines may differ, so you should replace them accordingly throughout this tutorial.
Step 0: Preconfiguration
All nodes in your cluster must be visible, so we will configure the /etc/hosts file on each of your machines:
Open the /etc/hosts file:
$ vi /etc/hosts
And replace its content with:
127.0.0.1 localhost
192.168.3.174 node1.local node1
192.168.3.175 node2.local node2
192.168.3.176 node3.local node3
At the end of this step, you can verify that the nodes are pinging each other.
Step 1: Installing Couchbase on all nodes
Perform these operations on all nodes.
Download the Couchbase Server 4.5.1 (current version) installation file:
$ wget http://packages.couchbase.com/releases/4.5.1/couchbase-server-enterprise_4.5.1-ubuntu14.04_amd64.deb
Install OpenSSL:
$ apt-get install libssl1.0.0
Install Couchbase Server 4.5.1:
$ dpkg -i couchbase-server-enterprise_4.5.1-ubuntu14.04_amd64.deb
Step 2: Creating the Couchbase Cluster
We will create the Couchbase cluster, starting with one node and gradually attaching the other nodes to the cluster.
In your browser, go to the address:
192.168.3.174:8091
An interface should appear.
Step 1 of 5: Allocate RAM for your data and Index, define the hostname
Step 2 of 5: Click Next
Step 3 of 5: Click Next
Step 4 of 5: Accept the terms and click Next
Step 5 of 5: Configure your administrator name and password, then click Next
At this stage: you have created a Couchbase cluster with one node. You now need to attach the other nodes to the cluster.
In your browser, go to the address (the operations are to be performed for node 2 and 3 in our case):
192.168.3.175:8091
Click on "Join a cluster now", enter the cluster IP address, username, and password, then click Next.
Go to the "Server Nodes" panel, then "Pending Rebalance", and click "Rebalance".
At the end of this step: the Couchbase cluster has been created. To verify that your cluster is functioning properly, go to the "Overview" panel and check that there are 3 active servers.
Great! Now that we have our cluster, we can generate a data stream and upsert it in real-time into the storage base.
Simulating a Data Stream and Upserting into the Storage Base
Main goal of the tutorial:
- Configuration of the Couchbase cluster to accommodate the stream ~ 5 minutes
- Generating the data stream and upserting into Couchbase ~ 5 minutes
What you need:
- A Couchbase cluster
In this tutorial, the stream will be generated automatically with a Python script, but the same principle can be applied to a stream coming from an API, datalake, etc. We will simulate banking transactions with the same format: Transaction id, Card id, Card type, Name, Amount, @timestamp
For example:
Transaction id: 123456789
Card id: 4815162342
Card type: Amex
Amount: 100.97$
@timestamp : 20-06-2017 13:50:23.345676
The transactions will be generated non-periodically, constituting our data stream, and then each transaction will be converted into JSON format to be inserted into the Couchbase cluster.
For example, the transaction shown above will be converted to JSON format before being inserted into Couchbase:
{
transaction_id : 123456789,
card_id : 4815162342,
card_type : “Amex”,
amount : 100.97,
@timestamp : “20-06-2017 13:50:23.345676”
}
Couchbase Configuration:
You need to define the bucket into which you will insert the data; by default, the bucket is 'default'. You need to index this bucket to be able to query the documents:
Go to "Query" and execute the command.
Now, your Couchbase cluster is ready to receive the data stream.
Generating the Data Stream and Upserting into Couchbase:
The data will be generated on a remote or internal node in our cluster.
To do this, we will install the Couchbase Python SDK on the machine where we want to generate the data:
$ apt-get install python
$ wget http://packages.couchbase.com/releases/couchbase-release/couchbase-release-1.0-2-amd64.deb
$ dpkg -i couchbase-release-1.0-2-amd64.deb
$ apt-get update$ apt-get install libcouchbase-dev build-essential python-dev python-pip
$ pip install couchbase
Our transaction generator:
Note: To enable upsertion, you need to change the IP address in the code to that of a node in the Couchbase cluster.
# Transaction generator
from couchbase.bucket import Bucket
from couchbase.n1ql import N1QLQuery
from collections import defaultdict
import json
import time
import random
import datetime
# bind our bucket 'default' on the right port
bucket = Bucket('couchbase://192.168.3.174/default')
transaction_id = 0
card_id = 0
card_type_list = ["Amex", "CB", "Visa", "Mastercard"]
amount = 0
timestamp_event = datetime.datetime.now()
while(1):
if(random.randint(0, 10)>3):
delay = random.uniform(0, 0.05)
else:
delay = random.uniform(0, 3)
timestamp_event += datetime.timedelta(seconds=delay)
transaction_id += 1
card_id = random.randint(0,10000)
card_type = card_type_list[random.randint(0,3)]
amount = random.uniform(0, 1000)
# create json
transaction = {}
transaction['transaction_id'] = transaction_id
transaction['card_id'] = card_id
transaction['card_type'] = card_type
transaction['amount'] = amount
transaction['@timestamp'] = str(timestamp_event)
print(transaction)
# upsert json in couchbase bucket
bucket.upsert("transaction_"+str(transaction_id), transaction)
time.sleep(delay)
When you run this code, you should see the data being upserted in real-time into Couchbase.
The data received in streaming into Couchbase is formalized as NoSQL documents: the data stream is represented by transactions with a notion of temporality.
At the end of this step: the data is inserted in real-time into Couchbase, and it is possible to modify the sample code to upsert data from different sources: API, database, etc.
Great! We are now storing the stream in our Couchbase cluster.
Retrieving and Processing Data with Spark in Streaming
Main goal of the tutorial:
- Deployment of the Spark cluster ~ 5 minutes Configuration of the Couchbase / Spark connector ~ 10 minutes
- Retrieving the data stream in Spark streaming ~ 10 minutes
What you need:
- A Couchbase cluster The data stream upserted in real-time into Couchbase (see previous tutorial)
Deploying the Spark Cluster:
A Spark cluster consists of a master node and multiple worker nodes. The master node only handles cluster management, and the workers are the executors of MapReduce jobs for parallelization.
To run a processing on a Spark cluster, you need to submit an application whose processing will be driven by a driver. Two execution modes are possible:
client mode: the driver is created on the machine that submits the application cluster mode: the driver is created inside the cluster
Step 1: Installing Spark on all nodes of the cluster
The first step is to download Spark on each of the nodes of your cluster.
$ wget http://d3kbcqa49mib13.cloudfront.net/spark-2.1.0-bin-hadoop2.7.tgz
$ tar -zxvf spark-2.1.0-bin-hadoop2.7.tgz
As of now, the version of Spark is 2.1.0, but you can choose a newer version if you prefer.
Step 2: Installing SBT and Scala on all nodes
$ echo "deb https://dl.bintray.com/sbt/debian /" | sudo tee -a /etc/apt/sources.list.d/sbt.list
$ sudo apt-key adv --keyserver hkp://keyserver.ubuntu.com:80 --recv 2EE0EA64E40A89B84B2DF73499E82A75642AC823
$ apt-get update$ apt-get install sbt
$ apt-get install default-jdk$ apt-get install scala
$ apt-get update
Step 3: Starting the master node
$ spark-2.1.0-bin-hadoop2.7/sbin/start-master.sh
Options:
- -i IP, --ip IP : IP address to bind to
- -p PORT, --port PORT : port used for internal communication
- --webui-port PORT : port for the Web interface
By default:
- the port for internal communication is port 7077 for the master, a random port for the workers
- the port for the Web interface is port 8080 for the master, port 8081 by default for the first worker, then 8082, 8083, etc...
You can then open a browser to the address of your master node on port 8080 to monitor the status of your cluster.
Step 4: Starting the workers
$ sudo spark-2.1.0-bin-hadoop2.7/bin/spark-class
org.apache.spark.deploy.worker.Worker spark://<adresse-master>:7077 --cores 2 --memory 4G
Options:
- -c CORES, --cores CORES : number of cores allocated
- -m MEM, --memory MEM : amount of memory allocated
* : by default, a worker uses all the memory of the machine, leaving less than 1GB to the OS.
** : a master node can also be a worker
Your Spark cluster is now created, and we will allow it to receive the data stream from Couchbase through a connector.
Configuration of the Couchbase / Spark connector:
I suggest you follow the Spark quickstart to familiarize yourself with deploying an application with SBT: https://spark.apache.org/docs/latest/quick-start.html. At the end of this tutorial, you will be familiar with deploying a Scala application on your Spark cluster.
The library of the connector we will use is available at: https://github.com/couchbase/couchbase-spark-connector
To add the Spark / Couchbase connector, the simplest way is to add this line to your build.sbt:
libraryDependencies += "com.couchbase.client" %% "spark-connector" % "1.2.0"
When building your application, before it is deployed in the Spark cluster, the Couchbase / Spark connector will be automatically installed. To use it in your Scala code, simply include this import line:
import com.couchbase.spark.streaming._
Now it's time to create the Scala code that will allow us to retrieve the data from Couchbase in streaming:
import com.couchbase.spark._
import com.couchbase.spark.sql._
import com.couchbase.spark.streaming._
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.streaming._
import org.apache.spark.{SparkConf, SparkContext}
import com.couchbase.client.java.document.{JsonArrayDocument, JsonDocument, Document}
import com.couchbase.client.java.document.json.{JsonArray, JsonObject}
import scala.util.parsing.json._
object StreamingExample {
Logger.getLogger("org").setLevel(Level.OFF)
Logger.getLogger("akka").setLevel(Level.OFF)
val conf = new SparkConf()
.setMaster("local[*]")
.setAppName("StreamingExample")
.set("com.couchbase.bucket.default", "")
val sc = new SparkContext(conf)
val ssc = new StreamingContext(sc, Seconds(10))
ssc.checkpoint(".")
def main(args: Array[String]): Unit = {
val stream = ssc
.couchbaseStream(from = FromBeginning, to = ToInfinity)
.filter(_.isInstanceOf[Mutation])
.map(m => new String(m.asInstanceOf[Mutation].content))
.map(s =>
(
JSON.parseFull(s).get.asInstanceOf[Map[String,String]].get("Amount").getOrElse(-1), //3 double
JSON.parseFull(s).get.asInstanceOf[Map[String,String]].get("@timestamp").getOrElse(-1) //4 string
)
)
.map( d => (d._1.toString.toDouble, d._2.toString))
stream.foreachRDD(rdd => {rdd.foreach(println)}) // Start the Stream and await termination
ssc.start()
ssc.awaitTermination()
}
}
This code detects mutations within the storage base and retrieves their content. Then we filter what we need (Amount and @timestamps) to process it later in real-time. The retrieval of the content of the documents is not necessarily explicit in the Couchbase documentation; indeed, only the detection of mutations is addressed. With this code, you can retrieve the content of the JSON documents that are streamed in real-time into the distributed storage base.
You now have a better understanding of what a real-time architecture is. The Couchbase / Spark architecture you have just created now allows you to process a data stream. As you have seen, its implementation is relatively simple. Then, if you want to go further, you can process this stream or even create your machine learning model using Spark's ML library!