In this article, Alberto Paro, the author of the book Elasticsearch 5.x Cookbook – Third Edition discusses how to use and manage Elasticsearch covering topics as installation/setup, mapping management, indices management, queries, aggregations/analytics, scripting, building custom plugins, and integration with Python, Java, Scala and some big data tools such as Apache Spark and Apache Pig.
(For more resources related to this topic, see here.)
Elasticsearch is a common answer for every needs of search on data and with its aggregation framework, it can provides analytics in real-time. Elasticsearch was one of the first software that was able to bring the search in BigData world. It’s cloud native design, JSON as standard format for both data and search, and its HTTP based approach are only the solid bases of this product.
Elasticsearch solves a growing list of search, log analysis, and analytics challenges across virtually every industry. It’s used by big companies such as Linkedin, Wikipedia, Cisco, Ebay, Facebook, and many others (source https://www.elastic.co/use-cases).
In this article, we will show how to easily build a simple search geolocator with Elasticsearch using Apache Spark for ingestion.
In this article, they will develop a search geolocator application using the world geonames database. To make this happen the following steps will be covered:
All the article code is available on GitHub at https://github.com/aparo/elasticsearch-geonames-locator. All the below commands need to be executed in the code directory on Linux/MacOS X. The requirements are a local Elasticsearch Server instance, a working local Spark installation and SBT installed (http://www.scala-sbt.org/) .
To populate our application we need a database of geo locations. One of the most famous and used dataset is the GeoNames geographical database, that is available for download free of charge under a creative commons attribution license. It contains over 10 million geographical names and consists of over 9 million unique features whereof 2.8 million populated places and 5.5 million alternate names.
It can be easily downloaded from http://download.geonames.org/export/dump.
The dump directory provided CSV divided in counties and but in our case we’ll take the dump with all the countries allCountries.zip file
To download the code we can use wget via:
wget http://download.geonames.org/export/dump/allCountries.zip
Then we need to unzip it and put in downloads folder:
unzip allCountries.zip
mv allCountries.txt downloads
The Geoname dump has the following fields:
No. | Attribute name | Explanation |
1 | geonameid | Unique ID for this geoname |
2 | name | The name of the geoname |
3 | asciiname | ASCII representation of the name |
4 | alternatenames | Other forms of this name. Generally in several languages |
5 | latitude | Latitude in decimal degrees of the Geoname |
6 | longitude | Longitude in decimal degrees of the Geoname |
7 | fclass | Feature class see http://www.geonames.org/export/codes.html |
8 | fcode | Feature code see http://www.geonames.org/export/codes.html |
9 | country | ISO-3166 2-letter country code |
10 | cc2 | Alternate country codes, comma separated, ISO-3166 2-letter country code |
11 | admin1 | Fipscode (subject to change to iso code |
12 | admin2 | Code for the second administrative division, a county in the US |
13 | admin3 | Code for third level administrative division |
14 | admin4 | Code for fourth level administrative division |
15 | population | The population of Geoname |
16 | elevation | The elevation in meters of Geoname |
17 | gtopo30 | Digital elevation model |
18 | timezone | The timezone of Geoname |
19 | moddate | The date of last change of this Geoname |
Table 1: Dataset characteristics
Elasticsearch provides automatic schema inference for your data, but the inferred schema is not the best possible. Often you need to tune it for:
Given the Geoname dataset, we will add a new field location that is a GeoPoint that we will use in geo searches.
Another important optimization for indexing, it’s define the correct number of shards. In this case we have only 11M records, so using only 2 shards is enough.
The settings for creating our optimized index with mapping and shards is the following one:
{
"mappings": {
"geoname": {
"properties": {
"admin1": {
"type": "keyword",
"ignore_above": 256
},
"admin2": {
"type": "keyword",
"ignore_above": 256
},
"admin3": {
"type": "keyword",
"ignore_above": 256
},
"admin4": {
"type": "keyword",
"ignore_above": 256
},
"alternatenames": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"asciiname": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"cc2": {
"type": "keyword",
"ignore_above": 256
},
"country": {
"type": "keyword",
"ignore_above": 256
},
"elevation": {
"type": "long"
},
"fclass": {
"type": "keyword",
"ignore_above": 256
},
"fcode": {
"type": "keyword",
"ignore_above": 256
},
"geonameid": {
"type": "long"
},
"gtopo30": {
"type": "long"
},
"latitude": {
"type": "float"
},
"location": {
"type": "geo_point"
},
"longitude": {
"type": "float"
},
"moddate": {
"type": "date"
},
"name": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
},
"population": {
"type": "long"
},
"timezone": {
"type": "text",
"fields": {
"keyword": {
"type": "keyword",
"ignore_above": 256
}
}
}
}
}
},
"settings": {
"index": {
"number_of_shards": "2",
"number_of_replicas": "1"
}
}
}
We can store the above JSON in a file called settings.json and we can create an index via the curl command:
curl -XPUT http://localhost:9200/geonames -d @settings.json
Now our index is created and ready to receive our documents.
Apache Spark is very hardy for processing CSV and manipulate the data before saving it in a storage both disk or NoSQL. Elasticsearch provides easy integration with Apache Spark allowing write Spark RDD with a single command in Elasticsearch.
We will build a spark job called GeonameIngester that will execute the following steps:
We need to import required classes:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import org.elasticsearch.spark.rdd.EsSpark
import scala.util.Try
We define the GeonameIngester object and the SparkSession:
object GeonameIngester {
def main(args: Array[String]) {
val sparkSession = SparkSession.builder
.master("local")
.appName("GeonameIngester")
.getOrCreate()
To easy serialize complex datatypes, we switch to use the Kryo encoder:
import scala.reflect.ClassTag
implicit def kryoEncoder[A](implicit ct: ClassTag[A]) = org.apache.spark.sql.Encoders.kryo[A](ct)
import sparkSession.implicits._
For parsing the CSV, we need to define the Geoname schema to be used to read:
val geonameSchema = StructType(Array(
StructField("geonameid", IntegerType, false),
StructField("name", StringType, false),
StructField("asciiname", StringType, true),
StructField("alternatenames", StringType, true),
StructField("latitude", FloatType, true),
StructField("longitude", FloatType, true),
StructField("fclass", StringType, true),
StructField("fcode", StringType, true),
StructField("country", StringType, true),
StructField("cc2", StringType, true),
StructField("admin1", StringType, true),
StructField("admin2", StringType, true),
StructField("admin3", StringType, true),
StructField("admin4", StringType, true),
StructField("population", DoubleType, true), // Asia population overflows Integer
StructField("elevation", IntegerType, true),
StructField("gtopo30", IntegerType, true),
StructField("timezone", StringType, true),
StructField("moddate", DateType, true)))
Now we can read all the geonames from CSV via:
val GEONAME_PATH = "downloads/allCountries.txt"
val geonames = sparkSession.sqlContext.read
.option("header", false)
.option("quote", "")
.option("delimiter", "t")
.option("maxColumns", 22)
.schema(geonameSchema)
.csv(GEONAME_PATH)
.cache()
The plain CSV data is not suitable for our advanced requirements, so we define new classes to store our Geoname data.
We define a GeoPoint object to store the Geo Point location of our geoname.
case class GeoPoint(lat: Double, lon: Double)
We define also our Geoname class with optional and list types:
case class Geoname(geonameid: Int,
name: String,
asciiname: String,
alternatenames: List[String],
latitude: Float,
longitude: Float,
location: GeoPoint,
fclass: String,
fcode: String,
country: String,
cc2: String,
admin1: Option[String],
admin2: Option[String],
admin3: Option[String],
admin4: Option[String],
population: Double,
elevation: Int,
gtopo30: Int,
timezone: String,
moddate: String)
To reduce the boilerplate of the conversion we define an implicit method that convert a String in an Option[String] if it is empty or null.
implicit def emptyToOption(value: String): Option[String] = {
if (value == null) return None
val clean = value.trim
if (clean.isEmpty) {
None
} else {
Some(clean)
}
}
During processing, in case of the population value is null we need a function to fix this value and set it to 0: to do this we define a function to fixNullInt:
def fixNullInt(value: Any): Int = {
if (value == null) 0 else {
Try(value.asInstanceOf[Int]).toOption.getOrElse(0)
}
}
We can populate the records that we need to store in Elasticsearch via a map on geonames DataFrame.
val records = geonames.map {
row =>
val id = row.getInt(0)
val lat = row.getFloat(4)
val lon = row.getFloat(5)
Geoname(id, row.getString(1), row.getString(2), Option(row.getString(3)).map(_.split(",").map(_.trim).filterNot(_.isEmpty).toList).getOrElse(Nil),
lat, lon, GeoPoint(lat, lon),
row.getString(6), row.getString(7), row.getString(8), row.getString(9),
row.getString(10), row.getString(11), row.getString(12), row.getString(13),
row.getDouble(14), fixNullInt(row.get(15)), row.getInt(16), row.getString(17), row.getDate(18).toString
)
}
The final step is to store our new build DataFrame records in Elasticsearch via:
EsSpark.saveToEs(records.toJavaRDD, "geonames/geoname", Map("es.mapping.id" -> "geonameid"))
The value “geonames/geoname” are the index/type to be used for store the records in Elasticsearch. To maintain the same ID of the geonames in both CSV and Elasticsearch we pass an additional parameter es.mapping.id that refers to where find the id to be used in Elasticsearch geonameid in the above example.
To execute a Spark job you need to build a Jar with all the required library and than to execute it on spark.
The first step is done via sbt assembly command that will generate a fatJar with only the required libraries.
To submit the Spark Job in the jar, we can use the spark-submit command:
spark-submit --class GeonameIngester target/scala-2.11/elasticsearch-geonames-locator-assembly-1.0.jar
Now you need to wait (about 20 minutes on my machine) that Spark will send all the documents to Elasticsearch and that they are indexed.
After having indexed all the geonames, you can search for them. In case we want search for Moscow, we need a complex query because:
To achieve this kind of query in Elasticsearch we can use a simple Boolean with several should queries for match the names and some filter to filter out unwanted results.
We can execute it via curl via:
curl -XPOST 'http://localhost:9200/geonames/geoname/_search' -d '{
"query": {
"bool": {
"minimum_should_match": 1,
"should": [
{ "term": { "name": "moscow"}},
{ "term": { "alternatenames": "moscow"}},
{ "term": { "asciiname": "moscow" }}
],
"filter": [
{ "term": { "fclass": "P" }},
{ "range": { "population": {"gt": 0}}}
]
}
},
"sort": [ { "population": { "order": "desc"}}]
}'
We used “moscow” lowercase because it’s the standard token generate for a tokenized string (Elasticsearch text type).
The result will be similar to this one:
{
"took": 14,
"timed_out": false,
"_shards": { "total": 2, "successful": 2, "failed": 0 },
"hits": {
"total": 9,
"max_score": null,
"hits": [
{
"_index": "geonames",
"_type": "geoname",
"_id": "524901",
"_score": null,
"_source": {
"name": "Moscow",
"location": {
"lat": 55.752220153808594,
"lon": 37.61555862426758
},
"latitude": 55.75222,
"population": 10381222,
"moddate": "2016-04-13",
"timezone": "Europe/Moscow",
"alternatenames": [
"Gorad Maskva",
"MOW",
"Maeskuy",
....
],
"country": "RU",
"admin1": "48",
"longitude": 37.61556,
"admin3": null,
"gtopo30": 144,
"asciiname": "Moscow",
"admin4": null,
"elevation": 0,
"admin2": null,
"fcode": "PPLC",
"fclass": "P",
"geonameid": 524901,
"cc2": null
},
"sort": [
10381222
]
},
We have processed the geoname so that in Elasticsearch, we were able to have a GeoPoint field. Elasticsearch GeoPoint field allows to enable search for a lot of geolocation queries.
One of the most common search is to find cities near me via a Geo Distance Query. This can be achieved modifying the above search in
curl -XPOST 'http://localhost:9200/geonames/geoname/_search' -d '{
"query": {
"bool": {
"filter": [
{
"geo_distance" : {
"distance" : "100km",
"location" : {
"lat" : 55.7522201,
"lon" : 36.6155586
}
}
},
{ "term": { "fclass": "P" }},
{ "range": { "population": {"gt": 0}}}
]
}
},
"sort": [ { "population": { "order": "desc"}}]
}'
Having indexed all the geonames, we can check the completes of our dataset and executing analytics on them. For example, it’s useful to check how many geonames there are for a single country and the feature class for every single top country to evaluate their distribution. This can be easily achieved using an Elasticsearch aggregation in a single query:
curl -XPOST 'http://localhost:9200/geonames/geoname/_search' -d ' {
"size": 0,
"aggs": {
"geoname_by_country": {
"terms": {
"field": "country",
"size": 5
},
"aggs": {
"feature_by_country": {
"terms": {
"field": "fclass",
"size": 5
}
}
}
}
}
}’
The result can be will be something similar:
{
"took": 477,
"timed_out": false,
"_shards": {
"total": 2,
"successful": 2,
"failed": 0
},
"hits": {
"total": 11301974,
"max_score": 0,
"hits": [
]
},
"aggregations": {
"geoname_by_country": {
"doc_count_error_upper_bound": 113415,
"sum_other_doc_count": 6787106,
"buckets": [
{
"key": "US",
"doc_count": 2229464,
"feature_by_country": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 82076,
"buckets": [
{
"key": "S",
"doc_count": 1140332
},
{
"key": "H",
"doc_count": 506875
},
{
"key": "T",
"doc_count": 225276
},
{
"key": "P",
"doc_count": 192697
},
{
"key": "L",
"doc_count": 79544
}
]
}
},…truncated…
These are simple examples how to easy index and search data with Elasticsearch.
Integrating Elasticsearch with Apache Spark it’s very trivial: the core of part is to design your index and your data model to efficiently use it.
After having correct indexed your data to cover your use case, Elasticsearch is able to provides your result or analytics in few microseconds.
In this article, we learned how to easily build a simple search geolocator with Elasticsearch using Apache Spark for ingestion.
Further resources on this subject:
At Packt, we are always on the lookout for innovative startups that are not only…
I remember deciding to pursue my first IT certification, the CompTIA A+. I had signed…
Key takeaways The transformer architecture has proved to be revolutionary in outperforming the classical RNN…
Once we learn how to deploy an Ubuntu server, how to manage users, and how…
Key-takeaways: Clean code isn’t just a nice thing to have or a luxury in software projects; it's a necessity. If we…
While developing a web application, or setting dynamic pages and meta tags we need to deal with…