Spark insert / append a record to RDD / DataFrame ( S3 )

In many circumstances, one might want to add data to Spark; e.g. when receiving/processing records via Spark Streaming.  Spark is changing rather quickly; and so are the ways to accomplish the above task (probably things will change again once 1.6 w/ DataSet API is released).

Ideally, I’d like to for streaming module to append/insert records into a DataFrame; to be batch processed later on by other modules.

I ended up just saving data to S3; and then using a different batch process that loaded records into a DataFrame and saved it as parquet file (and then another process that was merging parquet files).


For completeness here are old notes:

Spark Streaming suggests a couple of ways in which DStreams (RDD) could be output; basically as text and serialized (ref).  That seems somewhat limited.

p.s. insertInto has been deprecated in favor of: write.mode(SaveMode.Append).saveAsTable(tableName) (ref)

Using S3 for datastorage works quite nicely; here are some tips:

Here are also some good points:


Spark SQL comes with a builtin org.apache.spark.sql. parquet.DirectParquetOutputCommitter, which can be more efficient then the default Parquet output committer when writing data to S3. [ref]


May also consider using: “sqlContext.createExternalTable(tableName, warehouseDirectory)” in conjunction with “sqlContext.refreshTable(tableName)”.




While it is possible to save RDD (serialized or as text); many of the ML components rely on DataFrame; so saving DF might be more effective.

Additional References



MissingRequirement Exception

Need to add to build.sbt :

fork := true

Otherwise may get the following exception:

[error] (run-main-0) scala.reflect.internal.MissingRequirementError: class scala.Option in JavaMirror with ClasspathFilter(
[error] parent = URLClassLoader with NativeCopyLoader with RawResources(
[error] urls = List(target/scala-2.10/classes, 
not found.
 at scala.reflect.internal.MissingRequirementError$.signal(MissingRequirementError.scala:16)
 at scala.reflect.internal.MissingRequirementError$.notFound(MissingRequirementError.scala:17)
 at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:48)
 at scala.reflect.internal.Mirrors$RootsBase.getModuleOrClass(Mirrors.scala:61)
 at scala.reflect.internal.Mirrors$RootsBase.staticModuleOrClass(Mirrors.scala:72)
 at scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:119)
 at scala.reflect.internal.Mirrors$RootsBase.staticClass(Mirrors.scala:21)
 at org.apache.spark.sql.catalyst.ScalaReflection$$typecreator1$1.apply(ScalaReflection.scala:91)
 at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe$lzycompute(TypeTags.scala:231)
 at scala.reflect.api.TypeTags$WeakTypeTagImpl.tpe(TypeTags.scala:231)
 at org.apache.spark.sql.catalyst.ScalaReflection$class.localTypeOf(ScalaReflection.scala:76)
 at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:91)
 at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:29)
 at org.apache.spark.sql.catalyst.ScalaReflection$class.schemaFor(ScalaReflection.scala:64)
 at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:29)
 at org.apache.spark.sql.SQLContext.createDataFrame(SQLContext.scala:383)
 at org.activeintel.spark.Driver$.main(Driver.scala:25)
 at org.activeintel.spark.Driver.main(Driver.scala)
 at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
 at sun.reflect.NativeMethodAccessorImpl.invoke(
 at sun.reflect.DelegatingMethodAccessorImpl.invoke(
 at java.lang.reflect.Method.invoke(


HiveContext Exception


[error] Exception in thread "main" java.lang.RuntimeException: Tables created with SQLContext must be TEMPORARY. Use a HiveContext instead.

When working with a HiveContext, DataFrames can also be saved as persistent tables using the saveAsTable command. Unlike theregisterTempTable command, saveAsTable will materialize the contents of the dataframe and create a pointer to the data in the HiveMetastore.Persistent tables will still exist even after your Spark program has restarted, as long as you maintain your connection to the same metastore. A DataFrame for a persistent table can be created by calling the table method on a SQLContext with the name of the table.

This could be addressed by adding to build.sbt:

libraryDependencies += "org.apache.spark" %% "spark-hive" % sparkVersion // to add HiveContext


Hive Error Mkdirs failed to create file:/user/hive/warehouse/example/_temporary/0/_temporary/attempt_201512081950_0000_m_000001_0




keywords: persist parquet insert append row record rdd add save row





About Neil Rubens

This entry was posted in Uncategorized. Bookmark the permalink.

2 Responses to Spark insert / append a record to RDD / DataFrame ( S3 )

  1. feliks says:

    Hello guys, any clues on how to solve the hive error you have got in this case ?

Leave a Reply

Your email address will not be published. Required fields are marked *