vlambda博客
学习文章列表

分布式集群搭建(Spark+HBase)

    集群搭建过程参考过网上教程,整个搭建过程比较繁琐,需要有点耐心。配置完成后,即可进行分布式建模。分布式spark计算平台搭建需要非常细心,否则会出现很多奇怪的问题。

一 分布式spark计算平台搭建

1 选取三台服务器(CentOS系统64位6.5)

172.168.16.70 (主节点)

172.168.16.71 (从节点)

172.168.16.72 (从节点)

后续所有的操作都是在 root上进行的,最好不要用普通用户,有些操作可能会无权限。

2 修改host文件

三台服务器上的host文件都需要修改:vim /etc/hosts在原文件的后面加上:

172.168.16.70 Master
172.168.16.71 Slave1
172.168.16.72 Slave2

修改完成后保存:source /etc/hosts

3 ssh无密码验证配置
(1) 安装和启动ssh协议

查看是否安装openssh和rsync:

rpm -qa|grep openssh 

rpm -qa|grep rsync

通过下面命令进行安装:

yum install ssh (安装ssh协议)

yum install rsync (rsync是一个远程数据同步工具)

service sshd restart (启动服务)

(2) 配置Master无密码登录所有的Salve

配置Master节点,以下是在Master节点的配置操作。

1)在Master节点上生成密码对,在Master节点上执行以下命令:

ssh-keygen -t rsa -P ''

生成的密钥对:id_rsa和id_rsa.pub,默认存储在"/root/.ssh"目录下。

2)接着在Master节点上做如下配置,把id_rsa.pub追加到授权的key里面去:

cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys

3)修改ssh配置文件"/etc/ssh/sshd_config",将以下内容的注释去掉:

RSAAuthentication yes  # 启用 RSA 认证

PubkeyAuthentication yes  # 启用公钥私钥配对认证方式

AuthorizedKeysFile .ssh/authorized_keys  # 公钥文件路径

4)重启ssh服务,使设置有效:

service sshd restart

5)验证无密码登录本机是否成功:

ssh localhost

6)把公钥复制到所有的Slave机器上,使用下面的命令进行复制公钥:

scp /root/.ssh/id_rsa.pub root@Slave1:/root/

scp /root/.ssh/id_rsa.pub root@Slave2:/root/

接着配置Slave节点,以下是在Slave1节点的配置操作。

1)在"/root/"下创建".ssh"文件夹(如果已经存在就不需要创建了)

mkdir /root/.ssh

2)将Master的公钥追加到Slave1的授权文件"authorized_keys"中去:

cat /root/id_rsa.pub >> /root/.ssh/authorized_keys

3)修改"/etc/ssh/sshd_config",参考前面Master设置的第3步和第4步

4)用Master使用ssh无密码登录Slave1。

ssh 114.55.246.77

5)把"/root/"目录下的"id_rsa.pub"文件删除掉。

rm –r /root/id_rsa.pub

重复上面的5个步骤把Slave2服务器进行相同的配置。

(3) 配置所有Slave无密码登录Master

以下是在Slave1节点的配置操作。

1)创建"Slave1"自己的公钥和私钥,并把自己的公钥追加到"authorized_keys"文件中,执行下面命令:

ssh-keygen -t rsa -P ''

cat /root/.ssh/id_rsa.pub >> /root/.ssh/authorized_keys

2)将Slave1节点的公钥"id_rsa.pub"复制到Master节点的"/root/"目录下:

scp /root/.ssh/id_rsa.pub root@Master:/root/

以下是在Master节点的配置操作。

1)将Slave1的公钥追加到Master的授权文件"authorized_keys"中去

cat ~/id_rsa.pub >> ~/.ssh/authorized_keys

2)删除Slave1复制过来的"id_rsa.pub"文件:

rm –r /root/id_rsa.pub

配置完成后测试从Slave1到Master无密码登录:

ssh 172.168.16.70

按照上面的步骤把Slave2和Master之间建立起无密码登录。这样,Master能无密码验证登录每个Slave,每个Slave也能无密码验证登录到Master。

4 关闭selinux和防火墙

1)关闭selinux:

永久有效:vim /etc/sysconfig/selinux

将文本中的SELINUX=enforcing,改为SELINUX=disabled。然后重启

即时有效:setenforce 0

查看状态:getenforce

2)关闭防火墙:

永久性生效:chkconfig iptables off ,然后重启

即时生效:service iptables stop

查看防火墙状态:service iptables status

检查防火墙是否开机自启:chkconfig iptables --list

###############################################

重启网卡:systemctl restart network

5 安装基础环境(JAVA和SCALA环境)
(1) Java1.8环境搭建

1)下载jdk-8u121-linux-x64.tar.gz解压,tar -zxvf jdk-8u121-linux-x64.tar.gz

2)添加Java环境变量,在/etc/profile中添加:

export JAVA_HOME=/usr/local/jdk1.8.0_121
PATH=$JAVA_HOME/bin:$PATH
CLASSPATH=.:$JAVA_HOME/lib/rt.jar
export JAVA_HOME PATH CLASSPATH

3)保存后刷新配置source /etc/profile

(2) Scala2.11.8环境搭建

1)下载scala安装包scala-2.11.0.tgz,安装tar -zxvf scala-2.11.0.tgz

2)添加Scala环境变量,在/etc/profile中添加:

export SCALA_HOME=/usr/share/scala
export PATH=$SCALA_HOME/bin:$PATH

3)保存后刷新配置source /etc/profile

6 Hadoop2.7.3完全分布式搭建

以下是在Master节点操作:

1)下载二进制包hadoop-2.7.7.tar.gz

2)解压并移动到相应目录,命令如下:

tar -zxvf hadoop-2.7.7.tar.gz

mv hadoop-2.7.7 /opt

3)修改相应的配置文件

修改/etc/profile,增加如下内容:

 export HADOOP_HOME=/opt/hadoop-2.7.7/
export PATH=$PATH:$HADOOP_HOME/bin
export PATH=$PATH:$HADOOP_HOME/sbin
export HADOOP_MAPRED_HOME=$HADOOP_HOME
export HADOOP_COMMON_HOME=$HADOOP_HOME
export HADOOP_HDFS_HOME=$HADOOP_HOME
export YARN_HOME=$HADOOP_HOME
export HADOOP_ROOT_LOGGER=INFO,console
export HADOOP_COMMON_LIB_NATIVE_DIR=$HADOOP_HOME/lib/native
export HADOOP_OPTS="-Djava.library.path=$HADOOP_HOME/lib"

修改完成后执行source /etc/profile

修改$HADOOP_HOME/etc/hadoop/hadoop-env.sh

修改JAVA_HOME 如下:

 export JAVA_HOME=/usr/local/jdk1.8.0_121

修改$HADOOP_HOME/etc/hadoop/slaves

将原来的localhost删除,改成如下内容:Slave1, Slave2

修改$HADOOP_HOME/etc/hadoop/core-site.xml:

<configuration>
<property>
<name>fs.defaultFS</name>
<value>hdfs://Master:9000</value>
</property>
<property>
<name>io.file.buffer.size</name>
<value>131072</value>
</property>
<property>
<name>hadoop.tmp.dir</name>
<value>/opt/hadoop-2.7.7/tmp</value>
</property>
</configuration>

修改$HADOOP_HOME/etc/hadoop/hdfs-site.xml:

<configuration>
<property>
<name>dfs.namenode.secondary.http-address</name>
<value>Master:50090</value>
</property>
<property>
<name>dfs.replication</name>
<value>2</value>
</property>
<property>
<name>dfs.namenode.name.dir</name>
<value>file:/opt/hadoop-2.7.7/hdfs/name</value>
</property>
<property>
<name>dfs.datanode.data.dir</name>
<value>file:/opt/hadoop-2.7.7/hdfs/data</value>
</property>
</configuration>

复制template,生成xml。命令如下:

cp mapred-site.xml.template mapred-site.xml

修改$HADOOP_HOME/etc/hadoop/mapred-site.xml

<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
<property>
<name>mapreduce.jobhistory.address</name>
<value>Master:10020</value>
</property>
<property>
<name>mapreduce.jobhistory.address</name>
<value>Master:19888</value>
</property>
</configuration>

修改$HADOOP_HOME/etc/hadoop/yarn-site.xml

<configuration>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
<property>
<name>yarn.resourcemanager.address</name>
<value>Master:8032</value>
</property>
<property>
<name>yarn.resourcemanager.scheduler.address</name>
<value>Master:8030</value>
</property>
<property>
<name>yarn.resourcemanager.resource-tracker.address</name>
<value>Master:8031</value>
</property>
<property>
<name>yarn.resourcemanager.admin.address</name>
<value>Master:8033</value>
</property>
<property>
<name>yarn.resourcemanager.webapp.address</name>
<value>Master:8088</value>
</property>
</configuration>

4)复制Master节点的hadoop文件夹到Slave1和Slave2上:

scp -r /opt/hadoop-2.7.7 root@Slave1:/opt

scp -r /opt/hadoop-2.7.7 root@Slave2:/opt

5)在Slave1和Slave2上分别修改/etc/profile,过程同Master一样。

6)在Master节点启动集群,启动之前格式化一下namenode:

hadoop namenode -format

启动:

/opt/hadoop-2.7.7/sbin/start-all.sh

7)查看集群是否启动成功:

jps

Master显示:

SecondaryNameNode, ResourceManager, NameNode

Slave显示:

NodeManager,DataNode

7 Spark2.1.0完全分布式环境搭建

以下操作都在Master节点进行。

1)下载二进制包spark-2.1.0-bin-hadoop2.7.tgz

2)解压并移动到相应目录,命令如下:

tar -zxvf spark-2.1.0-bin-hadoop2.7.tgz

mv spark-2.1.0-bin-hadoop2.7 /opt

3)修改相应的配置文件。修改/etc/profie,增加如下内容:

export SPARK_HOME=/opt/spark-2.1.0-bin-hadoop2.7
export PATH=$PATH:$SPARK_HOME/bin

复制spark-env.sh.template成spark-env.sh

cp spark-env.sh.template spark-env.sh

修改$SPARK_HOME/conf/spark-env.sh,添加如下内容:

export JAVA_HOME=/usr/local/jdk1.8.0_121
export SCALA_HOME=/usr/share/scala
export HADOOP_HOME=/opt/hadoop-2.7.7
export HADOOP_CONF_DIR=/opt/hadoop-2.7.7/etc/hadoop
export SPARK_MASTER_IP=172.168.16.70
export SPARK_MASTER_HOST=172.168.16.70
export SPARK_LOCAL_IP=172.168.16.70
export SPARK_WORKER_MEMORY=1g
export SPARK_WORKER_CORES=2
export SPARK_HOME=/opt/spark-2.1.0-bin-hadoop2.7
SPARK_LOCAL_DIRS=/opt/spark-2.1.0-bin-hadoop2.7
export SPARK_DIST_CLASSPATH=$(/opt/hadoop-2.7.7/bin/hadoop classpath)

复制slaves.template成slaves

cp slaves.template slaves

修改$SPARK_HOME/conf/slaves,注释掉localhost,添加如下内容:

Master
Slave1
Slave2

4)将配置好的spark文件复制到Slave1和Slave2节点。

scp /opt/spark-2.1.0-bin-hadoop2.7 root@Slave1:/opt

scp /opt/spark-2.1.0-bin-hadoop2.7 root@Slave2:/opt

5)修改Slave1和Slave2配置。

在Slave1和Slave2上分别修改/etc/profile,增加Spark的配置,过程同Master一样。在Slave1和Slave2修改$SPARK_HOME/conf/spark-env.sh,将export SPARK_LOCAL_IP=172.168.16.70改成Slave1和Slave2对应节点的IP。

6)在Master节点启动集群。

/opt/spark-2.1.0-bin-hadoop2.7/sbin/start-all.sh

7)查看集群是否启动成功:

jps

Master在Hadoop的基础上新增了:Master

Slave在Hadoop的基础上新增了:Worker

二 分布式HBase数据库搭建
1 Zookeeper的环境配置
(1) zookeeper环境搭建

1)下载zookeeper-3.4.10.tar.gz,解压zookeeper-3.4.10.tar.gz

2)添加zookeeper环境变量,在/etc/profile中添加:

      
        
        
      
export ZK_HOME=/opt/zookeeper/zookeeper3.4
export PATH=.:${JAVA_HOME}/bin:${SCALA_HOME}/bin:${SPARK_HOME}/bin:${ZK_HOME}/bin:$PATH

3)保存后刷新配置source /etc/profile

4)在集群上的每台机器上都配置zookeeper环境

(2) 启动zookeeper(对集群每台机器均执行该操作)

切换到zookeeper目录下:

cd /opt/zookeeper/zookeeper3.4/bin

输入: 

zkServer.sh start

成功启动之后,查看状态输入: 

zkServer.sh status

2 HBase的环境配置
(1) HBase环境搭建

1)下载hbase-1.2.6-bin.tar.gz,解压hbase-1.2.6-bin.tar.gz

2)添加HBase环境变量,在/etc/profile中添加:

 
   
   
 
export HBASE_HOME=/opt/hbase/hbase1.2
export PATH=.:${JAVA_HOME}/bin:${SCALA_HOME}/bin:${SPARK_HOME}/bin:$PATH

3)保存后刷新配置source /etc/profile

(2) 修改配置文件

切换到 /opt/hbase/hbase-1.2.6/conf 下, 修改hbase-env.sh。编辑 hbase-env.sh 文件,添加以下配置:

      
        
        
      
export JAVA_HOME=/usr/local/jdk1.8.0_121
export HADOOP_HOME=/opt/hadoop-2.7.1
export HBASE_HOME=/opt/hbase/hbase1.2
export HBASE_CLASSPATH=/opt/hadoop-2.7.1/etc/hadoop
export HBASE_PID_DIR=/root/hbase/pids
export HBASE_MANAGES_ZK=false

编辑hbase-site.xml 文件,在<configuration>添加如下配置

 
   
   
 
   
     
     
   
<property>
<name>hbase.rootdir</name>
<value>hdfs://master:9000/hbase</value>
<description>The directory shared byregion servers.</description>
</property>
<!-- hbase端口 -->
<property>
<name>hbase.zookeeper.property.clientPort</name>
<value>2181</value>
</property>
<!-- 超时时间 -->
<property>
<name>zookeeper.session.timeout</name>
<value>120000</value>
</property>
<!--防止服务器时间不同步出错 -->
<property>
<name>hbase.master.maxclockskew</name>
<value>150000</value>
</property>
<!-- 集群主机配置 -->
<property>
<name>hbase.zookeeper.quorum</name>
<value>master,slave1,slave2</value>
</property>
<!-- 路径存放 -->
<property>
<name>hbase.tmp.dir</name>
<value>/root/hbase/tmp</value>
</property>
<!-- true表示分布式 -->
<property>
<name>hbase.cluster.distributed</name>
<value>true</value>
</property>
<!-- 指定master -->
<property>
<name>hbase.master</name>
<value>master:60000</value>
</property>
</configuration>

修改regionservers,将文件修改为 :

 
   
   
 
slave1
slave2

(3) 启动HBase

hbase环境传输到每台从节点之后,在主节点启动集群:

 
   
   
 
scp -r /opt/hbase root@slave1:/opt
scp -r /opt/hbase root@slave2:/opt

在成功启动Hadoop、zookeeper之后,切换到HBase目录下

cd /opt/hbase/hbase1.2/bin

输入: 

start-hbase.sh

启动成功之后,可以使用jps命令在各个机器上查看是否成功

三 分布式spark读写HBase数据库实例

1 计算环境启动
1.先启动hadoop进程,start-all.sh;
2.分别启动zookeeper进程,zkServer.sh start;
3.启动hbase进程,start-hbase.sh;
4.在zookeeper的bin目录下有一个zkCli.sh ,运行进入;
5.进入后运行 rmr /hbase , 然后quit退出;
6.spark-submit提交任务:spark-submit --master yarn --deploy-mode client --num-executors 6 --driver-memory 1g --executor-memory 5g -- executor-cores 2 demo.py
7.Hadoop的NameNode处在安全模式: bin/hadoop dfsadmin -safemode leave  bin/hadoop dfsadmin -safemode get

2 操作HBase数据库代码实例

# coding=utf-8
# /usr/bin/env pythpn
import base64, os, io, time, sys
import cv2
from PIL import Image
import numpy as np
import argparse
import happybase
from thrift.transport import TSocket, TTransport
from thrift.transport.TTransport import TBufferedTransport
from thrift.protocol import TBinaryProtocol, TCompactProtocol



class HBase_Operation():
def __init__(self, host="172.168.16.70"):
self.connection = happybase.Connection(host=host, port=9090, autoconnect=True, timeout=None, transport='buffered')

def create_table(self, table):
self.connection.create_table(table, {"data": {}})

def delete_table(self, table):
self.connection.delete_table(table, True)

def check_table(self, table):
t = self.connection.table(table)
regions = t.regions()
print("regions of table: {}".format(regions))
print("num regions: {}".format(len(regions)))
num_entry = 0
for region in regions:
start_key = region["start_key"]
end_key = region["end_key"]
for k, v in t.scan(row_start=start_key, row_stop=end_key):
print("rowkey: {}".format(k))
num_entry+=1
print("number of entry: {}".format(num_entry))

def refactor_table(self, table):
self.connection.delete_table(table)
self.connection.create_table(table)

def query_all_table_name(self):
table_list = self.connection.tables()
return table_list

def shutdown(self):
self.connection.close()


if __name__ == "__main__":
hbase_op = HBase_Operation()

operation = sys.argv[1]
table = ''
if not operation:
print("please specify the operation.")
hbase_op.shutdown()
sys.exit(-1)

elif operation != "query":
table = sys.argv[2]

if operation == "create":
hbase_op.create_table(table=table)
print("create table {}".format(table))

elif operation == "delete":
hbase_op.delete_table(table=table)

elif operation == "check":
hbase_op.check_table(table=table)

elif operation == "query":
table_list = hbase_op.query_all_table_name()
for table in table_list:
print(table+"\n")

elif operation == "refactor":
hbase_op.refactor_table(table)

else:
print("please specify the correct operations.")

hbase_op.shutdown()

3 spark计算读写HBase(sift特征提取)

# coding=utf-8
# /usr/bin/env pythpn
import dill
import os, codecs, json,io, sys, time, random
from collections import OrderedDict
from torchvision import transforms
import cv2
from PIL import Image
import numpy as np

import happybase, base64
# from hbase import Hbase
# from hbase.ttypes import *
from thrift.transport import TSocket, TTransport
from thrift.transport.TTransport import TBufferedTransport
from thrift.protocol import TBinaryProtocol, TCompactProtocol

from pyspark.sql import SparkSession
from pyspark.sql import functions as func
from pyspark import SparkContext, SparkConf
# reload(sys)
# sys.setdefaultencoding("utf-8")
def write_sift_for_each_partition(keys):
sift_count = 80
connection = happybase.Connection(host="172.168.16.70",
                             port=9090,
autoconnect=False,
timeout=None,
transport='buffered',
protocol='binary')
connection.open()
image_table = connection.table('claim:table1')
feature_table = connection.table('claim:table2')

for key in keys:
for img_id, bindata in image_table.scan(row_start=key[0], row_stop=key[1]):
extractor = cv2.xfeatures2d.SIFT_create()
sift = []

try:
iobuf = io.BytesIO(bindata["data:image"])
im = Image.open(iobuf)
im = np.asarray(im)
im = cv2.cvtColor(np.asarray(im), cv2.COLOR_BGR2RGB)

keypoints, features = extractor.detectAndCompute(im, None)
if len(keypoints) == 0:
print("error: img:{}".format(img_id))
ff = [(kp.response, fe) for kp, fe in zip(keypoints, features)]
sf = sorted(ff, key=lambda item: item[0], reverse=True)
num_sift = sift_count if len(sf) > sift_count else len(sf)
for res, fe in sf[:num_sift]:
fe = [int(_) for _ in fe]
sift.append(fe)

# write feature
with feature_table.batch(batch_size=128) as bat:
bytesio = io.BytesIO()
np.savetxt(bytesio, sift)
content = bytesio.getvalue()
b64_code = base64.b64encode(content)
bat.put(row=img_id, data={"data:sift": b64_code})

except Exception as e:
print("error: img: {}".format(img_id, str(e)))

connection.close()


if __name__ == "__mian__":
# config
sc = SparkSession.builder.appName("PySpark").getOrCreate()

# build connection
conection = happybase.Connection(host="30.105.10.1",
port=9090,
autoconnect=False,
timeout=None,
transport='buffered',
protocol='binary')

conection.open()

# open table
table = conection.table('claim:table1')

# get regions
regions = table.regions()
num_region = len(regions)

# get start key
keys = []
for region in regions:
keys.append((region["start_key"], region["end_key"]))

# close connection
conection.close()

# start spark session
rdd = sc.sparkContext.parallelize(keys, num_region)

rdd.foreachPatition(write_sift_for_each_partition)

4 程序终端启动

spark -submit --master yarn \
--deploy-mode client \
--queue='queue_01' \
--archives /*/python2env.zip#python2env \
--conf spark.pyspark.driver.python=/*/python2.7/bin/python2.7 \
--conf spark.pyspark.python=/python2env/python2.7/bin/python2.7 \
--conf spark.yarn.executor.memoryOverhead=8G \
--conf "spark.executor.extraJavaOptions=-XX:+PrintGCTimeStamps -XX:MaxDirectMemorySize=4096m" \
--conf spark.executor.instances=1 \
--conf spark.default.parallelism=1 \
--conf spark.speculation=false \
--conf spark.speculation.auantile=0.2 \
--conf spark.speculation.multiplier=1.5 \
--executor-memory 20G \
--num-executors 50 \
--executor-cores 3 \
--driver-memory 1G \
sift.py


    环境搭建成功之后,可根据上面的实例进行分布式任务计算。关于ml库的学习,可以参考项目https://github.com/yinhaoxs/Pyspark-hbase上的例子进行学习pyspark学习资料http://dblab.xmu.edu.cn/blog/1709-2/

    整个分布式集群的搭建需要花费一点时间,环境冲突很多,很容易出错。本教程的版本都是根据实践确认不会出现冲突的,可以借鉴。