vlambda博客
学习文章列表

Spark读取和存储HDFS上的数据

1.首先将集群的这3个文件hive-site.xml,core-size.xml,hdfs-site.xml放到资源文件里(必须,否则报错)

2.代码方面。下面几个测试都可以运行。

  1)test03.java

import org.apache.spark.sql.SparkSession;
import java.text.ParseException;

public class test03 {
public static void main(String[] args) throws ParseException {
SparkSession spark
= SparkSession
.builder()
.appName(
"Java Spark Hive Example")
.master(
"local[*]")
//.config("spark.sql.warehouse.dir", "/user/hive/warehouse")
.config("hadoop.home.dir", "/user/hive/warehouse")
.enableHiveSupport()
.getOrCreate();
spark.sql(
"SELECT * FROM mt1").show();
}
}

 2)Hive03.scala

Spark读取和存储HDFS上的数据
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.HiveContext

object Hive03 {
def main(args: Array[String]): Unit
= {
val conf
= new SparkConf().setAppName("test").setMaster("local[2]")
val sc
= new SparkContext(conf)
val sqlContext
= new HiveContext(sc)
sqlContext.table(
"mt1") // 库名.表名 的格式
.registerTempTable("person") // 注册成临时表
sqlContext.sql(
"""
| select *
| from person
| limit 10
""".stripMargin).show()
sc.stop()
}
}

 3) SparkHiveText.scala

import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.{SparkConf, SparkContext}
object SparkHiveText {
def main(args: Array[String]) {
val conf
=new SparkConf().setMaster("local").setAppName("SparkHiveText")
val sc
=new SparkContext(conf)
val hc
=new HiveContext(sc)
hc.sql(
"select * from mt1").show()
sc.stop()
}
}

运行时会出现类似这个错误 :出现错误(null) entry in command string: null chmod 0700

    解决办法:

在https://github.com/SweetInk/hadoop-common-2.7.1-bin中下载hadoop.dll,并拷贝到c:\windows\system32目录中。

运行结果:

Spark读取和存储HDFS上的数据

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi
="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation
="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>test</groupId>
<artifactId>test</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<spark.version>2.2.1</spark.version>
<scala.version>2.11</scala.version>
</properties>


<dependencies>

<dependency>
<groupId>org.codehaus.jettison</groupId>
<artifactId>jettison</artifactId>
<version>1.3.4</version>
</dependency>

<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>2.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!--自己1-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.2.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.2.1</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/com.sparkjava/spark-core -->
<dependency>
<groupId>com.sparkjava</groupId>
<artifactId>spark-core</artifactId>
<version>2.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-repl -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-repl_2.11</artifactId>
<version>2.2.1</version>
</dependency>

<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-20_2.11</artifactId>
<version>5.6.3</version>
</dependency>
<!--自己2-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.41</version>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.4</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>

<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</dependency>

<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<!-- simple logg facade for java(slf4-api.jar)日志接口 和 log4j具体日志系统之间的适配器 -->

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.12</version>
</dependency>


<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.12</version>
</dependency>
<!-- -->
<!-- https://mvnrepository.com/artifact/com.101tec/zkclient -->
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
</dependency>


</dependencies>

<build>
<plugins>

<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>

<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.19</version>
<configuration>
<skip>true</skip>
</configuration>
</plugin>

</plugins>
</build>
</project>
Spark读取和存储HDFS上的数据

pom.xml

--------------------------------------------------------------------------------------------------------------------------------------------------------------------------

spark读取的结果并写到hive里

import org.apache.spark.sql.{SaveMode, SparkSession}

object Hive05 {
def main(args: Array[String]): Unit
= {

val spark
= SparkSession
.builder().master(
"local[*]")
.appName(
"wer")
//.config("spark.sql.warehouse.dir", "/user/hive/warehouse") //这行可有可无
.enableHiveSupport()
.getOrCreate()
val df
= spark.table("mt1").createOrReplaceTempView("person") //自己hive的表,person是创建的临时视图
// 注册成临时表
val tt= spark.sql(" select BUS_NO,CITY_NO,INS_TIME from person")
tt.show();
tt.write.mode(SaveMode.Overwrite).saveAsTable(
"test05")

}
}

 ---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

遇到问题。hive on spark有很多问题。例如scala和spark兼容问题等等

import org.apache.spark.sql.SparkSession;
import java.text.ParseException;

public class test03 {
public static void main(String[] args) throws ParseException {

System.setProperty("hadoop.home.dir", "C:\\hadoop-common-2.2.0");
SparkSession spark
= SparkSession
.builder()
.appName(
"Java Spark Hive Example")
.master(
"local[*]")
//.config("spark.sql.warehouse.dir", "/user/hive/warehouse")
// .config("hadoop.home.dir", "/user/hive/warehouse")
.enableHiveSupport()
.getOrCreate();
spark.sql(
"SELECT * FROM mt1").show();
}
}

Spark读取和存储HDFS上的数据

 

参考博客:https://blog.csdn.net/woshixuye/article/details/53461975

2.还有scala版本不要使用2.12.X。我使用的是maven的2.11

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi
="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation
="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>test</groupId>
<artifactId>test</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<spark.version>2.2.1</spark.version>
<scala.version>2.11</scala.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-server</artifactId>
<version>1.2.4</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-protocol</artifactId>
<version>1.2.4</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-common</artifactId>
<version>1.2.4</version>
</dependency>
<dependency>
<groupId>org.apache.hbase</groupId>
<artifactId>hbase-client</artifactId>
<version>1.2.4</version>
</dependency>
<!---->
<dependency>
<groupId>org.codehaus.jettison</groupId>
<artifactId>jettison</artifactId>
<version>1.3.4</version>
</dependency>


<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql-kafka-0-10_2.11</artifactId>
<version>2.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
<version>2.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<!--自己1-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.2.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.11</artifactId>
<version>2.2.1</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.11</artifactId>
<version>2.2.1</version>
<scope>provided</scope>
</dependency>
<!-- https://mvnrepository.com/artifact/com.sparkjava/spark-core -->
<dependency>
<groupId>com.sparkjava</groupId>
<artifactId>spark-core</artifactId>
<version>2.2</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-repl -->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-repl_2.11</artifactId>
<version>2.2.1</version>
</dependency>

<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch-spark-20_2.11</artifactId>
<version>5.6.3</version>
</dependency>
<!--自己2-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.17</version>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.4</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>

<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
</dependency>

<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.12</version>
</dependency>


<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.12</version>
</dependency>
<!-- -->
<!-- https://mvnrepository.com/artifact/com.101tec/zkclient -->
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.10</version>
</dependency>
</dependencies>
<build>
<plugins>

<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<version>2.15.2</version>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
</plugin>

<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.6.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.19</version>
<configuration>
<skip>true</skip>
</configuration>
</plugin>

</plugins>
</build>
</project>

*hive设置使用spark 的计算模型

// 使用Hive On Spark非常简单
// 只要用set hive.execution.engine命令设置Hive的执行引擎为spark即可
// 默认是mr
set hive.execution.engine=spark;
// 这里,是完全可以将其设置为Spark Master的URL地址的
set spark.master=spark://192.168.1.107:7077