怎么实现Spark SchemaRDD隐式转换

今天就跟大家聊聊有关怎么实现Spark SchemaRDD隐式转换,可能很多人都不太了解,为了让大家更加了解,小编给大家总结了以下内容,希望大家根据这篇文章可以有所收获。

SchemaRDD在Spark SQL中已经被我们使用到,这篇文章简单地介绍一下如果将标准的RDD(org.apache.spark.rdd.RDD)转换成SchemaRDD,并进行SQL相关的操作。

01/**  

这是因为people是普通的RDD,而registerTempTable函数不属于RDD类,只有通过SchemaRDD的实例才可以调用,所以这么调用会出现错误,解决办法有两个:
  (1)registerTempTable函数是SQLContext类中的,所以我们可以将people转换成SchemaRDD,如下:

02 * User: 过往记忆03 * Date: 14-12-1604 * Time: 下午10:1605 * bolg: http://www.iteblog.com06 * 本文地址:http://www.iteblog.com/archives/122407 * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货08 * 过往记忆博客微信公共帐号:iteblog_hadoop09 */10scala> val peopleSchema =sqlContext.createSchemaRDD(people)11peopleSchema:org.apache.spark.sql.SchemaRDD =12SchemaRDD[29] at RDD at SchemaRDD.scala:10313==Query Plan ==14==Physical Plan ==15ExistingRdd [name#4,age#5], MapPartitionsRDD[28] at16 mapPartitions at basicOperators.scala:21717 18scala> peopleSchema.registerTempTable(“people”)19warning:there were 1deprecation warning(s); re-run with-deprecation fordetails

  这么调用就可以将people转成SchemaRDD。
  (2)、上面的方法是通过显示地调用sqlContext.createSchemaRDD将普通的RDD转成SchemaRDD。其实我们还可以通过Scala的隐式语法来进行转换。我们先来看看createSchemaRDD函数的定义

1/**2* Creates a SchemaRDD from an RDD of case classes.3*4* @group userf5*/6implicitdefcreateSchemaRDD[A <:Product:TypeTag](rdd:RDD[A]) ={7    SparkPlan.currentContext.set(self)8    newSchemaRDD(this, SparkLogicalPlan(ExistingRdd.fromProductRdd(rdd))(self))9}

  在定义createSchemaRDD的时候用到了implicit 关键字,所以我们在使用的时候可以通过下面语句使用

1scala> import sqlContext.createSchemaRDD2import sqlContext.createSchemaRDD3 4scala> people.registerAsTable(“people”)5warning:there were 1deprecation warning(s); re-run with-deprecation fordetails

  这样就隐身地将people转换成SchemaRDD了。这是因为Spark可以隐式地将包含case class的RDD转换成SchemaRDD。

看完上述内容,你们对怎么实现Spark SchemaRDD隐式转换有进一步的了解吗?如果还想了解更多知识或者相关内容,请关注亿速云行业资讯频道,感谢大家的支持。

Published by

风君子

独自遨游何稽首 揭天掀地慰生平

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注