Spark insert / append a record to RDD / DataFrame ( S3 )

In many circumstances, one might want to add data to Spark; e.g. when receiving/processing records via Spark Streaming.  Spark is changing rather quickly; and so are the ways to accomplish the above task (probably things will change again once 1.6 w/ DataSet API is released).

Ideally, I’d like to for streaming module to append/insert records into a DataFrame; to be batch processed later on by other modules.

I ended up just saving data to S3; and then using a different batch process that loaded records into a DataFrame and saved it as parquet file (and then another process that was merging parquet files).

 

For completeness here are old notes:

Spark Streaming suggests a couple of ways in which DStreams (RDD) could be output; basically as text and serialized (ref).  That seems somewhat limited.

https://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.DataFrame

p.s. insertInto has been deprecated in favor of: write.mode(SaveMode.Append).saveAsTable(tableName) (ref)

Using S3 for datastorage works quite nicely; here are some tips: http://www.slideshare.net/databricks/spark-summit-eu-2015-lessons-from-300-production-users#17

Here are also some good points: https://www.appsflyer.com/blog/the-bleeding-edge-spark-parquet-and-s3/

 

Spark SQL comes with a builtin org.apache.spark.sql. parquet.DirectParquetOutputCommitter, which can be more efficient then the default Parquet output committer when writing data to S3. [ref]

 

May also consider using: “sqlContext.createExternalTable(tableName, warehouseDirectory)” in conjunction with “sqlContext.refreshTable(tableName)”. https://forums.databricks.com/questions/1838/dataframe-write-append-to-parquet-table-partition.html

 

 

 

While it is possible to save RDD (serialized or as text); many of the ML components rely on DataFrame; so saving DF might be more effective.

Additional References

https://databricks.gitbooks.io/databricks-spark-reference-applications/content/logs_analyzer/chapter3/save_an_rdd_to_a_database.html

 

Various

MissingRequirement Exception

Need to add to build.sbt :

fork := true

Otherwise may get the following exception:

[error] (run-main-0) scala.reflect.internal.MissingRequirementError: class scala.Option in JavaMirror with ClasspathFilter(
[error] parent = URLClassLoader with NativeCopyLoader with RawResources(
[error] urls = List(target/scala-2.10/classes, 
not found.
 at scala.reflect.internal.MissingRequirementError$.signal(MissingRequirementError.scala:16)
 at scala.reflect.internal.MissingRequirementError$.notFound(MissingRequirementError.scala:17)
 at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:48)
 at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:61)
 at scala.reflect.internal.Mirrors$RootsBase.staticModuleOrClass(Mirrors.scala:72)
 at scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:119)
 at scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:21)
 at org.apache.spark.sql.catalyst.ScalaReflection$$typecreator1$1.apply(ScalaReflection.scala:91)
 at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:231)
 at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:231)
 at org.apache.spark.sql.catalyst.ScalaReflection$class.localTypeOf(ScalaReflection.scala:76)
 at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:91)
 at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:29)
 at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:64)
 at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:29)
 at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:383)
 at org.activeintel.spark.Driver$.main(Driver.scala:25)
 at org.activeintel.spark.Driver.main(Driver.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
 at java.lang.reflect.Method.invoke(Method.java:497)

 

HiveContext Exception

 

[error] Exception in thread "main" java.lang.RuntimeException: Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.

When working with a HiveContext, DataFrames can also be saved as persistent tables using the saveAsTable command. Unlike theregisterTempTable command, saveAsTable will materialize the contents of the dataframe and create a pointer to the data in the HiveMetastore.Persistent tables will still exist even after your Spark program has restarted, as long as you maintain your connection to the same metastore. A DataFrame for a persistent table can be created by calling the table method on a SQLContext with the name of the table. https://spark.apache.org/docs/latest/sql-programming-guide.html#saving-to-persistent-tables

This could be addressed by adding to build.sbt:

libraryDependencies += "org.apache.spark" %% "spark-hive" % sparkVersion // to add HiveContext

 

Hive Error

 

java.io.IOException: Mkdirs failed to create file:/user/hive/warehouse/example/_temporary/0/_temporary/attempt_201512081950_0000_m_000001_0

 

 

 

keywords: persist parquet insert append row record rdd add save row

 

 

 

 

About Neil Rubens

see http://ActiveIntelligence.org
This entry was posted in Uncategorized. Bookmark the permalink.

2 Responses to Spark insert / append a record to RDD / DataFrame ( S3 )

  1. feliks says:

    Hello guys, any clues on how to solve the hive error you have got in this case ?

Leave a Reply

Your email address will not be published. Required fields are marked *


*