Skip to content

Commit

Permalink
Spark connector (#41)
Browse files Browse the repository at this point in the history
Spark connector for Spark 3
  • Loading branch information
angelcervera authored Sep 27, 2020
1 parent c660b14 commit 7a84c7e
Show file tree
Hide file tree
Showing 25 changed files with 1,779 additions and 105 deletions.
17 changes: 17 additions & 0 deletions .github/workflows/scala.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
name: Scala CI

on: [push, pull_request]

jobs:
build:

runs-on: ubuntu-latest

steps:
- uses: actions/checkout@v2
- name: Set up JDK 1.8
uses: actions/setup-java@v1
with:
java-version: 1.8
- name: Run tests
run: sbt +test
2 changes: 2 additions & 0 deletions .scalafmt.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
project.git = true
maxColumn = 120
370 changes: 330 additions & 40 deletions README.md

Large diffs are not rendered by default.

183 changes: 120 additions & 63 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,28 @@ lazy val commonIOVersion = "2.5"
lazy val logbackVersion = "1.1.7"
lazy val scoptVersion = "3.7.1"
lazy val akkaVersion = "2.5.31"
lazy val sparkVersion = "3.0.1"

// Releases versions
lazy val scala213 = "2.13.2"
lazy val scala212 = "2.12.11"
lazy val scalaVersions = List(scala213, scala212)

scapegoatVersion in ThisBuild := "1.4.5"

lazy val commonSettings = Seq(
crossScalaVersions := scalaVersions,
organization := "com.acervera.osm4scala",
organizationHomepage := Some(url("http://www.acervera.com")),
licenses += ("MIT", url("http://opensource.org/licenses/MIT")),
homepage in ThisBuild := Some(
url(s"https://github.com/angelcervera/osm4scala")
url(s"https://github.com/simplexspatial/osm4scala")
),
scmInfo in ThisBuild := Some(
ScmInfo(
url("https://github.com/angelcervera/osm4scala"),
"scm:git:git://github.com/angelcervera/osm4scala.git",
"scm:git:ssh://github.com:angelcervera/osm4scala.git"
url("https://github.com/simplexspatial/osm4scala"),
"scm:git:git://github.com/simplexspatial/osm4scala.git",
"scm:git:ssh://github.com:simplexspatial/osm4scala.git"
)
),
developers in ThisBuild := List(
Expand All @@ -38,10 +41,11 @@ lazy val commonSettings = Seq(
)
),
libraryDependencies ++= Seq(
"org.scalatest" %% "scalatest" % scalatestVersion % "test",
"org.scalacheck" %% "scalacheck" % scalacheckVersion % "test",
"commons-io" % "commons-io" % commonIOVersion % "test"
)
"org.scalatest" %% "scalatest" % scalatestVersion % Test,
"org.scalacheck" %% "scalacheck" % scalacheckVersion % Test,
"commons-io" % "commons-io" % commonIOVersion % Test
),
test in assembly := {}
)

lazy val disablingPublishingSettings =
Expand All @@ -54,19 +58,23 @@ lazy val enablingPublishingSettings = Seq(
// Bintray
bintrayPackageLabels := Seq("scala", "osm", "openstreetmap"),
bintrayRepository := "maven",
bintrayVcsUrl := Some("https://github.com/angelcervera/osm4scala.git")
bintrayVcsUrl := Some("https://github.com/simplexspatial/osm4scala.git")
)

import sbtrelease.ReleasePlugin.autoImport.ReleaseTransformations._
lazy val root = (project in file("."))
.disablePlugins(AssemblyPlugin)
.aggregate(
core,
spark,
sparkFatShaded,
commonUtilities,
examplesCounter,
examplesCounterParallel,
examplesCounterAkka,
examplesTagsExtraction,
examplesPrimitivesExtraction
examplesPrimitivesExtraction,
exampleSparkUtilities
)
.settings(
name := "osm4scala-root",
Expand All @@ -90,36 +98,74 @@ lazy val root = (project in file("."))
)
)

lazy val core = Project(id = "core", base = file("core")).settings(
commonSettings,
enablingPublishingSettings,
name := "osm4scala-core",
description := "Scala Open Street Map Pbf 2 parser. Core",
bintrayPackage := "osm4scala",
coverageExcludedPackages := "org.openstreetmap.osmosis.osmbinary.*",
PB.targets in Compile := Seq(
scalapb.gen(grpc = false) -> (sourceManaged in Compile).value
),
libraryDependencies ++= Seq(
"ch.qos.logback" % "logback-classic" % logbackVersion
lazy val core = Project(id = "core", base = file("core"))
.disablePlugins(AssemblyPlugin)
.settings(
commonSettings,
enablingPublishingSettings,
name := "osm4scala-core",
description := "Scala OpenStreetMap Pbf 2 parser. Core",
bintrayPackage := "osm4scala-core",
PB.targets in Compile := Seq(
scalapb.gen(grpc = false) -> (sourceManaged in Compile).value
)
)
)

lazy val spark = Project(id = "spark", base = file("spark"))
.enablePlugins(AssemblyPlugin)
.settings(
commonSettings,
crossScalaVersions := Seq(scala212),
enablingPublishingSettings,
name := "osm4scala-spark",
description := "Spark connector for OpenStreetMap Pbf 2 parser.",
bintrayPackage := "osm4scala-spark",
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-sql" % sparkVersion % Provided
),
assemblyOption in assembly := (assemblyOption in assembly).value.copy(
includeScala = false,
cacheUnzip = false,
cacheOutput = false
),
assemblyShadeRules in assembly := Seq(
ShadeRule
.rename("com.google.protobuf.**" -> "shadeproto.@1")
.inAll
)
)
.dependsOn(core)


lazy val sparkFatShaded = Project(id = "osm4scala-spark-shaded", base = file("osm4scala-spark-shaded"))
.disablePlugins(AssemblyPlugin)
.settings(
commonSettings,
crossScalaVersions := Seq(scala212),
enablingPublishingSettings,
name := "osm4scala-spark-shaded",
description := "Spark connector for OpenStreetMap Pbf 2 parser as shaded fat jar.",
bintrayPackage := "osm4scala-spark-shaded",
packageBin in Compile := (assembly in (spark, Compile)).value
)


// Examples

lazy val commonUtilities = Project(
id = "examples-common-utilities",
base = file("examples/common-utilities")
).settings(
commonSettings,
skip in publish := true,
name := "osm4scala-examples-common-utilities",
description := "Utilities shared by all examples",
libraryDependencies ++= Seq("com.github.scopt" %% "scopt" % scoptVersion)
)
lazy val commonUtilities = Project(id = "examples-common-utilities", base = file("examples/common-utilities"))
.disablePlugins(AssemblyPlugin)
.settings(
commonSettings,
skip in publish := true,
name := "osm4scala-examples-common-utilities",
description := "Utilities shared by all examples",
libraryDependencies ++= Seq("com.github.scopt" %% "scopt" % scoptVersion)
)
.disablePlugins(AssemblyPlugin)

lazy val examplesCounter =
Project(id = "examples-counter", base = file("examples/counter"))
.disablePlugins(AssemblyPlugin)
.settings(
commonSettings,
disablingPublishingSettings,
Expand All @@ -128,59 +174,70 @@ lazy val examplesCounter =
)
.dependsOn(core, commonUtilities)

lazy val examplesCounterParallel = Project(
id = "examples-counter-parallel",
base = file("examples/counter-parallel")
).settings(
lazy val examplesCounterParallel = Project(id = "examples-counter-parallel", base = file("examples/counter-parallel"))
.disablePlugins(AssemblyPlugin)
.settings(
commonSettings,
disablingPublishingSettings,
name := "osm4scala-examples-counter-parallel",
description := "Counter of primitives (Way, Node, Relation or All) using osm4scala in parallel threads"
)
.dependsOn(core, commonUtilities)

lazy val examplesCounterAkka =
Project(id = "examples-counter-akka", base = file("examples/counter-akka"))
.settings(
commonSettings,
disablingPublishingSettings,
name := "osm4scala-examples-counter-akka",
description := "Counter of primitives (Way, Node, Relation or All) using osm4scala in parallel with AKKA",
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % akkaVersion
)
lazy val examplesCounterAkka = Project(id = "examples-counter-akka", base = file("examples/counter-akka"))
.disablePlugins(AssemblyPlugin)
.settings(
commonSettings,
disablingPublishingSettings,
name := "osm4scala-examples-counter-akka",
description := "Counter of primitives (Way, Node, Relation or All) using osm4scala in parallel with AKKA",
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor" % akkaVersion
)
.dependsOn(core, commonUtilities)
)
.dependsOn(core, commonUtilities)

lazy val examplesTagsExtraction = Project(
id = "examples-tag-extraction",
base = file("examples/tagsextraction")
).settings(
lazy val examplesTagsExtraction = Project(id = "examples-tag-extraction", base = file("examples/tagsextraction"))
.disablePlugins(AssemblyPlugin)
.settings(
commonSettings,
disablingPublishingSettings,
name := "osm4scala-examples-tags-extraction",
description := "Extract all unique tags from the selected primitive type (Way, Node, Relation or All) using osm4scala"
)
.dependsOn(core, commonUtilities)

lazy val examplesBlocksExtraction = Project(
id = "examples-blocks-extraction",
base = file("examples/blocksextraction")
).settings(
lazy val examplesBlocksExtraction = Project(id = "examples-blocks-extraction", base = file("examples/blocksextraction"))
.disablePlugins(AssemblyPlugin)
.settings(
commonSettings,
disablingPublishingSettings,
name := "osm4scala-examples-blocks-extraction",
description := "Extract all blocks from the pbf into a folder using osm4scala."
)
.dependsOn(core, commonUtilities)

lazy val examplesPrimitivesExtraction = Project(
id = "examples-primitives-extraction",
base = file("examples/primitivesextraction")
).settings(
lazy val examplesPrimitivesExtraction =
Project(id = "examples-primitives-extraction", base = file("examples/primitivesextraction"))
.disablePlugins(AssemblyPlugin)
.settings(
commonSettings,
disablingPublishingSettings,
name := "osm4scala-examples-primitives-extraction",
description := "Extract all primitives from the pbf into a folder using osm4scala."
)
.dependsOn(core, commonUtilities)

lazy val exampleSparkUtilities = Project(id = "examples-spark-utilities", base = file("examples/spark-utilities"))
.disablePlugins(AssemblyPlugin)
.settings(
commonSettings,
disablingPublishingSettings,
name := "osm4scala-examples-primitives-extraction",
description := "Extract all primitives from the pbf into a folder using osm4scala."
crossScalaVersions := Seq(scala212),
name := "osm4scala-examples-spark-utilities",
description := "Example of different utilities using osm4scala and Spark.",
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-sql" % sparkVersion % Provided
)
)
.dependsOn(core, commonUtilities)
.dependsOn(spark, commonUtilities)
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,5 @@ object RelationMemberEntityTypes extends Enumeration {
val Unrecognized = Value(3)
}


// FIXME: relationTypes should be singular instead plural.
case class RelationMemberEntity(val id: Long, val relationTypes: RelationMemberEntityTypes.Value, val role: String)
Binary file not shown.
45 changes: 45 additions & 0 deletions examples/spark-utilities/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
# Spark examples

```
osm4scala-spark-utilities Example of utilities processing OSM Pbf files, using Spark and osm4scala
Usage: osm4scala-spark-utilities [counter|tag_keys] [options]
--help prints this usage text
-i, --input <file/files>
Input is a required pbf2 format set of files.
-o, --output <path> Output is a required path to store the result.
-f, --outputFormat [csv, orc, parquet, etc.]
Format that spark will used to store the result.
-c, --coalesce <coalesce>
Number of files that will generate. By default, is not going to join files.
Command: counter [options]
Primitives counter.
-t, --type <type> Primitive type [Node, Way, Relation] used to filter.
Command: tag_keys [options]
Tags extraction.
-t, --type <type> Primitive type [Node, Way, Relation] used to filter.
```

## Examples:

```shell script
bin/spark-submit \
--packages 'com.github.scopt:scopt_2.12:3.7.1,com.acervera.osm4scala:osm4scala-spark-shaded_2.12:1.0.5' \
--class com.acervera.osm4scala.examples.spark.Driver \
"osm4scala-examples-spark-utilities_2.12-1.0.5-SNAPSHOT.jar" \
counter \
-i <pbf files path> \
-o <output> \
-c 1 \
-f csv

bin/spark-submit \
--packages 'com.github.scopt:scopt_2.12:3.7.1,com.acervera.osm4scala:osm4scala-spark-shaded_2.12:1.0.5' \
--class com.acervera.osm4scala.examples.spark.Driver \
"osm4scala-examples-spark-utilities_2.12-1.0.5-SNAPSHOT.jar" \
tag_keys \
-i <pbf files path> \
-o <output> \
-c 1 \
-f csv
```
Loading

0 comments on commit 7a84c7e

Please sign in to comment.