Categories: TutorialsData

Building A Search Geo Locator with Elasticsearch and Spark

11 min read

In this article, Alberto Paro, the author of the book Elasticsearch 5.x Cookbook – Third Edition discusses how to use and manage Elasticsearch covering topics as installation/setup, mapping management, indices management, queries, aggregations/analytics, scripting, building custom plugins, and integration with Python, Java, Scala and some big data tools such as Apache Spark and Apache Pig.

(For more resources related to this topic, see here.)

Background

Elasticsearch is a common answer for every needs of search on data and with its aggregation framework, it can provides analytics in real-time. Elasticsearch was one of the first software that was able to bring the search in BigData world. It’s cloud native design, JSON as standard format for both data and search, and its HTTP based approach are only the solid bases of this product.

Elasticsearch solves a growing list of search, log analysis, and analytics challenges across virtually every industry. It’s used by big companies such as Linkedin, Wikipedia, Cisco, Ebay, Facebook, and many others (source https://www.elastic.co/use-cases).

In this article, we will show how to easily build a simple search geolocator with Elasticsearch using Apache Spark for ingestion.

Objective

In this article, they will develop a search geolocator application using the world geonames database. To make this happen the following steps will be covered:

  • Data collection
  • Optimized Index creation
  • Ingestion via Apache Spark
  • Searching for a location name
  • Searching for a city given a location position
  • Executing some analytics on the dataset.

All the article code is available on GitHub at https://github.com/aparo/elasticsearch-geonames-locator. All the below commands need to be executed in the code directory on Linux/MacOS X. The requirements are a local Elasticsearch Server instance, a working local Spark installation and SBT installed (http://www.scala-sbt.org/) .

Data collection

To populate our application we need a database of geo locations. One of the most famous and used dataset is the GeoNames geographical database, that is available for download free of charge under a creative commons attribution license. It contains over 10 million geographical names and consists of over 9 million unique features whereof 2.8 million populated places and 5.5 million alternate names.

It can be easily downloaded from http://download.geonames.org/export/dump.

The dump directory provided CSV divided in counties and but in our case we’ll take the dump with all the countries allCountries.zip file

To download the code we can use wget via:

wget http://download.geonames.org/export/dump/allCountries.zip

Then we need to unzip it and put in downloads folder:

unzip allCountries.zip
mv allCountries.txt downloads

The Geoname dump has the following fields:

No.

Attribute name

Explanation

1

geonameid

Unique ID for this geoname

2

name

The name of the geoname

3

asciiname

ASCII representation of the name

4

alternatenames

Other forms of this name. Generally in several languages

5

latitude

Latitude in decimal degrees of the Geoname

6

longitude

Longitude in decimal degrees of the Geoname

7

fclass

Feature class see http://www.geonames.org/export/codes.html

8

fcode

Feature code see http://www.geonames.org/export/codes.html

9

country

ISO-3166 2-letter country code

10

cc2

Alternate country codes, comma separated, ISO-3166 2-letter country code

11

admin1

Fipscode (subject to change to iso code

12

admin2

Code for the second administrative division, a county in the US

13

admin3

Code for third level administrative division

14

admin4

Code for fourth level administrative division

15

population

The population of Geoname

16

elevation

The elevation in meters of Geoname

17

gtopo30

Digital elevation model

18

timezone

The timezone of Geoname

19

moddate

The date of last change of this Geoname

Table 1: Dataset characteristics

Optimized Index creation

Elasticsearch provides automatic schema inference for your data, but the inferred schema is not the best possible. Often you need to tune it for:

  • Removing not-required fields
  • Managing Geo fields.
  • Optimizing string fields that are index twice in their tokenized and keyword version.

Given the Geoname dataset, we will add a new field location that is a GeoPoint that we will use in geo searches.

Another important optimization for indexing, it’s define the correct number of shards. In this case we have only 11M records, so using only 2 shards is enough.

The settings for creating our optimized index with mapping and shards is the following one:

{
  "mappings": {
    "geoname": {
      "properties": {
        "admin1": {
          "type": "keyword",
          "ignore_above": 256
        },
        "admin2": {
          "type": "keyword",
          "ignore_above": 256
        },
        "admin3": {
          "type": "keyword",
          "ignore_above": 256
        },
        "admin4": {
          "type": "keyword",
          "ignore_above": 256
        },
        "alternatenames": {
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }
          }
        },
        "asciiname": {
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }
          }
        },
        "cc2": {
          "type": "keyword",
          "ignore_above": 256
        },
        "country": {
          "type": "keyword",
          "ignore_above": 256
        },
        "elevation": {
          "type": "long"
        },
        "fclass": {
          "type": "keyword",
          "ignore_above": 256
        },
        "fcode": {
          "type": "keyword",
          "ignore_above": 256
        },
        "geonameid": {
          "type": "long"
        },
        "gtopo30": {
          "type": "long"
        },
        "latitude": {
          "type": "float"
        },
        "location": {
          "type": "geo_point"
        },
        "longitude": {
          "type": "float"
        },
        "moddate": {
          "type": "date"
        },
        "name": {
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }
          }
        },
        "population": {
          "type": "long"
        },
        "timezone": {
          "type": "text",
          "fields": {
            "keyword": {
              "type": "keyword",
              "ignore_above": 256
            }
          }
        }
      }
    }
  },
  "settings": {
    "index": {
      "number_of_shards": "2",
      "number_of_replicas": "1"
    }
  }
}

We can store the above JSON in a file called settings.json and we can create an index via the curl command:

curl -XPUT http://localhost:9200/geonames -d @settings.json

Now our index is created and ready to receive our documents.

Ingestion via Apache Spark

Apache Spark is very hardy for processing CSV and manipulate the data before saving it in a storage both disk or NoSQL. Elasticsearch provides easy integration with Apache Spark allowing write Spark RDD with a single command in Elasticsearch.

We will build a spark job called GeonameIngester that will execute the following steps:

  1. Initialize the Spark Job
  2. Parse the CSV
  3. Defining our required structures and conversions
  4. Populating our classes
  5. Writing the RDD in Elasticsearch
  6. Executing the Spark Job

Initialize the Spark Job

We need to import required classes:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.types._
import org.elasticsearch.spark.rdd.EsSpark

import scala.util.Try

We define the GeonameIngester object and the SparkSession:

object GeonameIngester {
  def main(args: Array[String]) {
    val sparkSession = SparkSession.builder
      .master("local")
      .appName("GeonameIngester")
      .getOrCreate()

To easy serialize complex datatypes, we switch to use the Kryo encoder:

import scala.reflect.ClassTag
implicit def kryoEncoder[A](implicit ct: ClassTag[A]) = org.apache.spark.sql.Encoders.kryo[A](ct)

import sparkSession.implicits._

Parse the CSV

For parsing the CSV, we need to define the Geoname schema to be used to read:

val geonameSchema = StructType(Array(
  StructField("geonameid", IntegerType, false),
  StructField("name", StringType, false),
  StructField("asciiname", StringType, true),
  StructField("alternatenames", StringType, true),
  StructField("latitude", FloatType, true),
  StructField("longitude", FloatType, true),
  StructField("fclass", StringType, true),
  StructField("fcode", StringType, true),
  StructField("country", StringType, true),
  StructField("cc2", StringType, true),
  StructField("admin1", StringType, true),
  StructField("admin2", StringType, true),
  StructField("admin3", StringType, true),
  StructField("admin4", StringType, true),
  StructField("population", DoubleType, true), // Asia population overflows Integer
  StructField("elevation", IntegerType, true),
  StructField("gtopo30", IntegerType, true),
  StructField("timezone", StringType, true),
  StructField("moddate", DateType, true)))

Now we can read all the geonames from CSV via:

val GEONAME_PATH = "downloads/allCountries.txt"


val geonames = sparkSession.sqlContext.read
  .option("header", false)
  .option("quote", "")
  .option("delimiter", "t")
  .option("maxColumns", 22)
  .schema(geonameSchema)
  .csv(GEONAME_PATH)
  .cache()

Defining our required structures and conversions

The plain CSV data is not suitable for our advanced requirements, so we define new classes to store our Geoname data.

We define a GeoPoint object to store the Geo Point location of our geoname.

case class GeoPoint(lat: Double, lon: Double)

We define also our Geoname class with optional and list types:

case class Geoname(geonameid: Int,
                   name: String,
                   asciiname: String,
                   alternatenames: List[String],
                   latitude: Float,
                   longitude: Float,
                   location: GeoPoint,
                   fclass: String,
                   fcode: String,
                   country: String,
                   cc2: String,
                   admin1: Option[String],
                   admin2: Option[String],
                   admin3: Option[String],
                   admin4: Option[String],
                   population: Double,
                   elevation: Int,
                   gtopo30: Int,
                   timezone: String,
                   moddate: String)

To reduce the boilerplate of the conversion we define an implicit method that convert a String in an Option[String] if it is empty or null.

implicit def emptyToOption(value: String): Option[String] = {
  if (value == null) return None
  val clean = value.trim
  if (clean.isEmpty) {
    None
  } else {
    Some(clean)
  }
}

During processing, in case of the population value is null we need a function to fix this value and set it to 0: to do this we define a function to fixNullInt:

def fixNullInt(value: Any): Int = {
  if (value == null) 0 else {
    Try(value.asInstanceOf[Int]).toOption.getOrElse(0)
  }
}

Populating our classes

We can populate the records that we need to store in Elasticsearch via a map on geonames DataFrame.

val records = geonames.map {
  row =>
    val id = row.getInt(0)
    val lat = row.getFloat(4)
    val lon = row.getFloat(5)
    Geoname(id, row.getString(1), row.getString(2), Option(row.getString(3)).map(_.split(",").map(_.trim).filterNot(_.isEmpty).toList).getOrElse(Nil),
      lat, lon, GeoPoint(lat, lon),
      row.getString(6), row.getString(7), row.getString(8), row.getString(9),
      row.getString(10), row.getString(11), row.getString(12), row.getString(13),
      row.getDouble(14), fixNullInt(row.get(15)), row.getInt(16), row.getString(17), row.getDate(18).toString

    )
}

Writing the RDD in Elasticsearch

The final step is to store our new build DataFrame records in Elasticsearch via:

EsSpark.saveToEs(records.toJavaRDD, "geonames/geoname", Map("es.mapping.id" -> "geonameid"))

The value “geonames/geoname” are the index/type to be used for store the records in Elasticsearch. To maintain the same ID of the geonames in both CSV and Elasticsearch we pass an additional parameter es.mapping.id that refers to where find the id to be used in Elasticsearch geonameid in the above example.

Executing the Spark Job

To execute a Spark job you need to build a Jar with all the required library and than to execute it on spark.

The first step is done via sbt assembly command that will generate a fatJar with only the required libraries.

To submit the Spark Job in the jar, we can use the spark-submit command:

spark-submit --class GeonameIngester target/scala-2.11/elasticsearch-geonames-locator-assembly-1.0.jar

Now you need to wait (about 20 minutes on my machine) that Spark will send all the documents to Elasticsearch and that they are indexed.

Searching for a location name

After having indexed all the geonames, you can search for them. In case we want search for Moscow, we need a complex query because:

  • City in geonames are entities with fclass=”P”
  • We want skip not populated cities
  • We sort by population descendent to have first the most populated
  • The city name can be in name, alternatenames or asciiname field

To achieve this kind of query in Elasticsearch we can use a simple Boolean with several should queries for match the names and some filter to filter out unwanted results.

We can execute it via curl via:

curl -XPOST 'http://localhost:9200/geonames/geoname/_search' -d '{
 "query": {
  "bool": {
   "minimum_should_match": 1,
   "should": [
  { "term": { "name": "moscow"}},
       { "term": { "alternatenames": "moscow"}},
       { "term": { "asciiname": "moscow" }}
   ],
   "filter": [
     { "term": { "fclass": "P" }},
     { "range": { "population": {"gt": 0}}}
   ]
  }
 },
 "sort": [ {   "population": { "order": "desc"}}]
}'

We used “moscow” lowercase because it’s the standard token generate for a tokenized string (Elasticsearch text type).

The result will be similar to this one:

{
  "took": 14,
  "timed_out": false,
  "_shards": {    "total": 2, "successful": 2, "failed": 0 },
  "hits": {
    "total": 9,
    "max_score": null,
    "hits": [
      {
        "_index": "geonames",
        "_type": "geoname",
        "_id": "524901",
        "_score": null,
        "_source": {
          "name": "Moscow",
          "location": {
            "lat": 55.752220153808594,
            "lon": 37.61555862426758
          },
          "latitude": 55.75222,
          "population": 10381222,
          "moddate": "2016-04-13",
          "timezone": "Europe/Moscow",
          "alternatenames": [
            "Gorad Maskva",
            "MOW",
            "Maeskuy",
            ....
          ],
          "country": "RU",
          "admin1": "48",
          "longitude": 37.61556,
          "admin3": null,
          "gtopo30": 144,
          "asciiname": "Moscow",
          "admin4": null,
          "elevation": 0,
          "admin2": null,
          "fcode": "PPLC",
          "fclass": "P",
          "geonameid": 524901,
          "cc2": null
        },
        "sort": [
          10381222
        ]
      },

Searching for cities given a location position

We have processed the geoname so that in Elasticsearch, we were able to have a GeoPoint field. Elasticsearch GeoPoint field allows to enable search for a lot of geolocation queries.

One of the most common search is to find cities near me via a Geo Distance Query. This can be achieved modifying the above search in

curl -XPOST 'http://localhost:9200/geonames/geoname/_search' -d '{
 "query": {
  "bool": {
   "filter": [
       {
                "geo_distance" : {
                    "distance" : "100km",
                    "location" : {
                        "lat" : 55.7522201,
                        "lon" : 36.6155586
                    }
                }
            },
     { "term": { "fclass": "P" }},
     { "range": { "population": {"gt": 0}}}
   ]
  }
 },
 "sort": [ {   "population": { "order": "desc"}}]
}'

Executing an analytic on the dataset.

Having indexed all the geonames, we can check the completes of our dataset and executing analytics on them. For example, it’s useful to check how many geonames there are for a single country and the feature class for every single top country to evaluate their distribution. This can be easily achieved using an Elasticsearch aggregation in a single query:

curl -XPOST 'http://localhost:9200/geonames/geoname/_search' -d ' {
  "size": 0,
  "aggs": {
    "geoname_by_country": {
      "terms": {
        "field": "country",
        "size": 5
      },
      "aggs": {
        "feature_by_country": {
          "terms": {
            "field": "fclass",
            "size": 5
          }
        }
      }
    }
  }
}’

The result can be will be something similar:

{
  "took": 477,
  "timed_out": false,
  "_shards": {
    "total": 2,
    "successful": 2,
    "failed": 0
  },
  "hits": {
    "total": 11301974,
    "max_score": 0,
    "hits": [

    ]
  },
  "aggregations": {
    "geoname_by_country": {
      "doc_count_error_upper_bound": 113415,
      "sum_other_doc_count": 6787106,
      "buckets": [
        {
          "key": "US",
          "doc_count": 2229464,
          "feature_by_country": {
            "doc_count_error_upper_bound": 0,
            "sum_other_doc_count": 82076,
            "buckets": [
              {
                "key": "S",
                "doc_count": 1140332
              },
              {
                "key": "H",
                "doc_count": 506875
              },
              {
                "key": "T",
                "doc_count": 225276
              },
              {
                "key": "P",
                "doc_count": 192697
              },
              {
                "key": "L",
                "doc_count": 79544
              }
            ]
          }
        },…truncated…

These are simple examples how to easy index and search data with Elasticsearch.

Integrating Elasticsearch with Apache Spark it’s very trivial: the core of part is to design your index and your data model to efficiently use it.

After having correct indexed your data to cover your use case, Elasticsearch is able to provides your result or analytics in few microseconds.

Summary

In this article, we learned how to easily build a simple search geolocator with Elasticsearch using Apache Spark for ingestion.

Resources for Article:


Further resources on this subject:


Packt

Share
Published by
Packt

Recent Posts

Harnessing Tech for Good to Drive Environmental Impact

At Packt, we are always on the lookout for innovative startups that are not only…

2 months ago

Top life hacks for prepping for your IT certification exam

I remember deciding to pursue my first IT certification, the CompTIA A+. I had signed…

3 years ago

Learn Transformers for Natural Language Processing with Denis Rothman

Key takeaways The transformer architecture has proved to be revolutionary in outperforming the classical RNN…

3 years ago

Learning Essential Linux Commands for Navigating the Shell Effectively

Once we learn how to deploy an Ubuntu server, how to manage users, and how…

3 years ago

Clean Coding in Python with Mariano Anaya

Key-takeaways:   Clean code isn’t just a nice thing to have or a luxury in software projects; it's a necessity. If we…

3 years ago

Exploring Forms in Angular – types, benefits and differences   

While developing a web application, or setting dynamic pages and meta tags we need to deal with…

3 years ago