Jan 17, 2018

Apache Spark offers the ability to write Generic UDFs. However, for an idiomatic implementation, there are a couple of things that one needs to keep in mind.
  1. You should return a subtype of Option because Spark treats None subtype automatically as null and is able to extract value from Some subtype.
  2. Your Generic UDFs should be able to handle Option or regular type as input. To accomplish this, use type matching in case of Option and recursively extract values. This scenario occurs, if your UDF is in turn wrapped by another UDF.
If these considerations are handled correctly, the implemented UDF has several important benefits:
  • It avoids the code duplication. And,
  • It handles nulls in a more idiomatic way.
Here is an example of a UDF that can be used to calculate the intervals between two time periods.
import java.time.{LocalDate, ZoneId}
import java.time.format.DateTimeFormatter
import java.time.temporal.ChronoUnit

import scala.util.Try

def convertToDate[T](date: T): Option[LocalDate] = {
  if (date == null) return None
  date match {
    case dt: LocalDate => Some(dt)
    case dt: String =>
      if (dt.isEmpty) return None
      val retValue = Try {
        LocalDate.parse(dt, DateTimeFormatter.ISO_DATE)
      }.getOrElse(LocalDate.parse(dt, DateTimeFormatter.ISO_LOCAL_DATE_TIME))
      Some(retValue)
    case dt: java.sql.Date => Some(dt.toLocalDate)
    case dt: java.util.Date => Some(dt.toInstant.atZone(ZoneId.systemDefault()).toLocalDate)
    case dt: Option[_] => if (dt.isDefined) convertToDate(dt.get) else None
  }
}


def interval_between[V1, V2](fromDate: V1, toDate: V2, intType: String): Option[Long] = {
  def calculateInterval(fromDate: LocalDate, toDate: LocalDate, intType: String = "months"): Option[Long] = {
    val returnVal = intType match {
      case "decades" => ChronoUnit.DECADES.between(fromDate, toDate)
      case "years" => ChronoUnit.YEARS.between(fromDate, toDate)
      case "months" => ChronoUnit.MONTHS.between(fromDate, toDate)
      case "days" => ChronoUnit.DAYS.between(fromDate, toDate)
      case "hours" => ChronoUnit.HOURS.between(fromDate, toDate)
      case "minutes" => ChronoUnit.MINUTES.between(fromDate, toDate)
      case "seconds" => ChronoUnit.SECONDS.between(fromDate, toDate)
      case _ => throw new IllegalArgumentException(s"$intType is not supported")
    }
    Some(returnVal)
  }

  val fromDt = convertToDate(fromDate)
  val toDt = convertToDate(toDate)
  if (fromDt.isEmpty || toDt.isEmpty) {
    return None
  }
  calculateInterval(fromDt.get, toDt.get, intType.toLowerCase)
}
The above UDF takes care of the concerns mentioned earlier in the post. To use it, you simply have to register it as a UDF with SparkSession.
 ss.udf.register("interval_between", interval_between _)

Posted on Wednesday, January 17, 2018 by Unknown

Testing Spark Dataframe transforms is essential and can be accomplished in a more reusable manner. The way, I generally accomplish that is to
  • Read the expected and test Dataframe, and
  • Invoke the desired transform, and
  • Calculate the difference between dataframes. The only caveat in calculating the difference is that in built except function is not sufficient for columns with decimal column types and that requires a bit of work.
To accomplish generic dataframe comparison:
  • We need to look at the type of the column and when its numeric,
  • Convert it to the corresponding java type and then do decimal comparisons , while allowing for custom precision mismatches. Otherwise,
  • Just use the except clause for other column comparisons.

Comparison Code

def compareDF(result: Dataset[Row], expected: Dataset[Row]): Unit = {
val expectedSchemaMap = expected.schema.map(sf => (sf.name, sf.dataType)).toMap[String, DataType]
val resSchemaMap = result.schema.map(sf => (sf.name, sf.dataType)).toMap[String, DataType]
_ match {
  case (name: String, dType: NumericType) =>
    assert(compareNumericTypes(result, expected, resSchemaMap(name), dType, name), s"$name column was not equal")
  case kv: Map[_, _] =>
    assert(result.select(kv._1).except(result.select(kv._1)).count() == 0, s"${kv._1} column was not equal")
}
}

def compareNumericTypes(result: Dataset[Row], expected: Dataset[Row], resType: DataType, expType: DataType, colName: String, precision: Double = 0.01): Boolean = {
  //collect Results
  val res = extractAndSortNumericRow(result, colName, resType)
  val exp = extractAndSortNumericRow(expected, colName, expType)
  //compare lengths first
  if (res.length != exp.length) return false
  res match {
    case Seq(_: java.lang.Integer, _*) | Seq(_: java.lang.Long, _*) =>
      !res.zip(exp).exists(zipped => (safelyGet(zipped._1).longValue() - safelyGet(zipped._2).longValue()) != 0L)
    case Seq(_: java.lang.Float, _*) | Seq(_: java.lang.Double, _*) =>
      !res.zip(exp).exists(zipped => (safelyGet(zipped._1).doubleValue() - safelyGet(zipped._2).doubleValue()).abs >= precision)
  }
}
//upcast types
def safelyGet[T >: Number](v: T): T = {
  v match {
    case _: java.lang.Long | _: java.lang.Integer => java.lang.Long.parseLong(v.toString)
    case _: java.lang.Float | _: java.lang.Double =>
      java.lang.Double.parseDouble(v.toString)
    case _ => v
  }
}

//map internal spark types to java types.


def extractAndSortNumericRow[T <: span=""> NumericType](df: Dataset[Row], colName: String, dt: T): Seq[Number] = {
  import ss.implicits._
  dt match {
    case _: LongType => df.select(colName).map(row => row.getAs[java.lang.Long](0)).sort().collect()
    case _: IntegerType => df.select(colName).map(row => row.getAs[java.lang.Integer](0)).sort().collect()
    case _: DoubleType => df.select(colName).map(row => row.getAs[java.lang.Double](0)).sort().collect()
    case _: FloatType => df.select(colName).map(row => row.getAs[java.lang.Float](0)).sort().collect()
    case _: DecimalType => df.select(colName).map(row => row.getAs[java.math.BigDecimal](0)).sort().collect()
  }
}
The code above does the heavylifting for doing comparisons for dataframes. Now all we need is a simple function that invokes the transforms and some simple scalatest testing code showing all this in action.
Function that invokes the transform and does comparison:
def invokeAndCompare(testFileName: String, expectedFileName: String, func: Dataset[Row] => Dataset[Row]): Unit = {
  val df = readJsonDF(testFileName)
  val expected = readJsonDF(expectedFileName)
  val transformResult = func(df)
  compareDF(transformResult, expected)
}

def readJsonDF(fileName: String): Dataset[Row] = {
  ss.read.json(fileName)
}

Testing Code

Just utilize ScalaTest. Here is how a test looks like for your transforms.
class RandomTransformsTest extends FlatSpec with Matchers with BeforeAndAfter {
  after {
    //close spark session
    ss.close()
  }
  before {
    val ss = SparkSession.builder().master("local[*]").getOrCreate()
  }

  "testRandomTransform" should "give correct output for input dataframe" in {
    val testFileLoc = ""
    val expectedFileLoc = ""
    //just get the function definition, it will be invoked by invokeAndCompare with the dataframe later on.
    val func = RandomTransforms.someRandomFunc() _
    SomeObject.invokeAndCompare(testFileLoc, expectedFileLoc, func)
  }

}

Wrap Up:

So, there we go, testing made easy for Spark dataframes. It requires some tedious mapping for decimal numbers, but once developed, tests are easy to write for all your dataframe transforms.

Posted on Wednesday, January 17, 2018 by Unknown

Oct 28, 2017

I have been working a lot on Spark and Scala. I have really like scala as a language, due to its numerous advantages over Java, the foremost being that for a simpler API having Type classes and Default Method Arguments does wonders. Also, idiomatic scala code uses higher order functions, so it encourages a functional style of programming.
I also like spark a lot, but couldn't stand the inefficient way it was being used, i.e. processing a bunch of sql queries sequentially. I strongly believe, Spark wasn't designed to be used this way.
SparkSession supports executing multiple queries in parallel provided of course that they are independent. So, there was a clear optimization opportunity in Orchestrating i.e. wresting control of execution, whilst providing sufficient callback mechanisms. Thus, I developed a framework which given a set of queries and their dependencies builds a DAG [1]. It then uses dynamic programming to find out the depth of each node correctly. The idea, then is to to create stages corresponding to the nodes at each depth and as they are independent, they can be executed in parallel.
image_dag
Once we have the DAG's stages, the execution is pretty straightforward using ExecutorService and configuring an implicit instance of the ExecutionContext to use the configured ExecutorService.
In general, for a framework, once you wrest control of execution there are numerous advantages, some of the potent ones are re-usability, optimization and maintainability.
The framework thus developed has the following features:
  • Optimization : Parallel execution of query or custom processing steps.
  • Global and Local bind variable substitutions.
  • Ability to enable explain plan by turning on configuration option.
  • JSON based logging using a AsyncAppender [2] (this is essentially a BlockingQueue, as multiple threads can write and only a single consumer should write to the log file.), so can be easily integrated with splunk.
  • Custom UDF registration and default registration of a bunch of common UDF's.
  • Custom hooks into the execution by implementing a trait which is then invoked at the right stage by the Orchestrator (Inversion of Control).
  • Configuration based coding (users don't need to know scala or spark to use it). And,
  • Reusability and Maintaenability.
[1]DAG: https://en.wikipedia.org/wiki/Directed_acyclic_graph
[2]AsyncAppender: https://logback.qos.ch/manual/appenders.html#AsyncAppender

Posted on Saturday, October 28, 2017 by Unknown

Jun 26, 2016

Apache Zeppelin provides a Web-UI where you can iteratively build spark scripts in Scala, Python, etc. (It also provides autocomplete support), run Sparkql queries against Hive or other store and visualize the results from the query or spark dataframes. This is somewhat akin to what Ipython notebooks do for python. Spark developers know that building, testing and fixing errors in spark scripts can be a lengthy process (It is also dull because it is not interactive), but if you use Apache Zeppelin, you can iteratively buld and test portions of your script and this will enhance your productivity significantly.

Installing and Configuring Apache Zeppelin

Ensure following prerequisites are installed
  • Java 8: su -c yum install java-1.8.0-openjdk-devel
  • Maven 3.1.x+: sudo yum install apache-maven and then link it sudo ln -s /usr/share/apache-maven/bin/mvn /usr/bin/mvn. If this does not work for you, you can install it the following way.
    wget http://www.eu.apache.org/dist/maven/maven-3/3.3.3/binaries/apache-maven-3.3.3-bin.tar.gz
    sudo tar -zxf apache-maven-3.3.3-bin.tar.gz -C /usr/local/
    sudo ln -s /usr/local/apache-maven-3.3.3/bin/mvn /usr/local/bin/mvn
    
  • Git: sudo yum install git
  • NPM: yum install nodejs npm
  • Either download the source code from here or clone the git repository in a folder as git clone https://github.com/apache/incubator-zeppelin.git
  • Build from source, Go to the incubator-zeppelin directory and run the following command from it.
    mvn clean package -Pspark-1.5 -Ppyspark -Dhadoop.version=2.6.0-cdh5.5.0 -Phadoop-2.6 -Dmaven.test.skip=true
    This command works for version 5.5 of cloudera distribution, make sure your versions of hadoop and spark are correct. In addtion to installing support for spark, this command will configure zeppelin with support for pyspark as well.
  • To configure access for hive metastore copy the hive-site.xml to conf directory under zeppelin.
  • In the conf folder create copies of files zeppelin-env.sh.template and zeppelin-site.xml.template as zeppelin-env.sh and zeppelin-site.xml respectively.
  • If you would like to change the port for zeppelin, change the following property in zeppelin-site.xml.
    <property>
      <name>zeppelin.server.port</name>
      <value>8999</value>
      <description>Server port.</description>
    </property>
    
  • To start zeppelin use the command ./zeppelin-daemon.sh start. Then you can access zeppelin ui at http://localhost:8999 [1]
  • To stop zeppelin use the command ./zeppelin-daemon.sh stop

Running SparkQL queries against Hive and Visualizing Results

In a cell in zeppelin type %hive to activate interpreter with hive ql support. After you do this, you can then run the query and the visualization support is automatically activated in the output. To execute the cell use Shift+Enter key.

Bulding scala scripts and plotting model outputs

You can also code in scala or python by activating the interpreter. Scala and Spark interpreter is activated by default for a cell.
To visualize the spark dataframe just use z.show(df) command.

Writing documentation

Activate the markdown support in a cell by using %md. You can then add documentation along with your code. Unfortunately, the support for latex is still not there, but it should be there in future releases.

What's missing ?

Unlike ipython notebooks, there is no option to export to html or pdf(using latex). Also, the support for embedding latex expressions is missing, but these features should be added in future releases.

Conclusion

Although certain features are missing, Apache Zeppelin surely helps you in increasing your productivity by reducing the time required for build, test and fix cycle. Also, it provides nice visualization capabilities for your queries and dataframes.
[1]If you changed the port.

Posted on Sunday, June 26, 2016 by Unknown