-
Notifications
You must be signed in to change notification settings - Fork 232
/
Copy path参考步骤.txt
389 lines (342 loc) · 13.8 KB
/
参考步骤.txt
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
1.创建hbase表
create 'weblogs','info'
2.配置flume文件
node2中:
a2.sources = r1
a2.sinks = k1
a2.channels = c1
a2.sources.r1.type = exec
a2.sources.r1.command = tail -F /opt/data/weblog-flume.log
a2.sources.r1.channels = c1
a2.channels.c1.type = memory
a2.channels.c1.capacity = 1000
a2.channels.c1.transactionCapacity = 1000
a2.channels.c1.keep-alive = 5
a2.sinks.k1.type = avro
a2.sinks.k1.channel = c1
a2.sinks.k1.hostname = node1
a2.sinks.k1.port = 5555
#######################################################################
node3中:
a3.sources = r1
a3.sinks = k1
a3.channels = c1
a3.sources.r1.type = exec
a3.sources.r1.command = tail -F /opt/data/weblog-flume.log
a3.sources.r1.channels = c1
a3.channels.c1.type = memory
a3.channels.c1.capacity = 1000
a3.channels.c1.transactionCapacity = 1000
a3.channels.c1.keep-alive = 5
a3.sinks.k1.type = avro
a3.sinks.k1.channel = c1
a3.sinks.k1.hostname = node1
a3.sinks.k1.port = 5555
########################################################################
node1中:
a1.sources = r1
a1.channels = kafkaC hbaseC
a1.sinks = kafkaSink hbaseSink
a1.sources.r1.type = avro
a1.sources.r1.channels = hbaseC kafkaC
a1.sources.r1.bind = node1
a1.sources.r1.port = 5555
a1.sources.r1.threads = 5
#****************************flume + hbase******************************
a1.channels.hbaseC.type = memory
a1.channels.hbaseC.capacity = 10000
a1.channels.hbaseC.transactionCapacity = 10000
a1.channels.hbaseC.keep-alive = 20
a1.sinks.hbaseSink.type = asynchbase
a1.sinks.hbaseSink.table = weblogs
a1.sinks.hbaseSink.columnFamily = info
a1.sinks.hbaseSink.serializer = org.apache.flume.sink.hbase.KfkAsyncHbaseEventSerializer
a1.sinks.hbaseSink.channel = hbaseC
a1.sinks.hbaseSink.serializer.payloadColumn = datetime,userid,searchname,retorder,cliorder,cliurl
#****************************flume + kafka******************************
a1.channels.kafkaC.type = memory
a1.channels.kafkaC.capacity = 10000
a1.channels.kafkaC.transactionCapacity = 10000
a1.channels.kafkaC.keep-alive = 20
a1.sinks.kafkaSink.channel = kafkaC
a1.sinks.kafkaSink.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.kafkaSink.brokerList = node1:9092,node2:9092,node3:9092
a1.sinks.kafkaSink.topic = weblogs
a1.sinks.kafkaSink.zookeeperConnect = node1:2181,node2:2181,node3:2181
a1.sinks.kafkaSink.requiredAcks = 1
a1.sinks.kafkaSink.batchSize = 1
a1.sinks.kafkaSink.serializer.class = kafka.serializer.StringEncoder
3.日志数据格式处理
cat weblog.log |tr "\t" "," > weblog2.log // 将制表符改为逗号
cat word.txt | sed 's/[ ][ ]*/,/g' // 将多个空格换位逗号
4.自定义flume的hbase sink并打成jar包上传到flume/lib下
5.创建weblogs项目来采集数据,并打成jar包发布到服务器node2和node3(/opt/jars)
6.编写启动jar包weblogs程序的shell在node2和node3(/opt/shell)
touch weblog-shell.sh
------------------------------------------------------------------------------
#/bin/bash
echo "start log......"
java -jar /opt/jars/weblogs.jar /opt/data/weblog.log /opt/data/weblog-flume.log
-------------------------------------------------------------------------------
7.编写flume集群服务启动脚本(node1,node2,node3中flume目录下,下例在node2中,node3中a2改为a3)
#/bin/bash
echo "flume-2 start"
bin/flume-ng agent --conf conf -f conf/flume-conf.properties -n a2 -Dflume.root.logger=INFO.console
8.编写测试kafka消费的shell
vi kfk-test-consumer.sh
#/bin/bash
echo "kfk-kafka-consumer.sh start......"
bin/kafka-console-consumer.sh --zookeeper node1:2181,node2:2181,node3:2181 --from-beginning --topic weblogs
9.进行测试flume采集数据的全流程
1)启动hdfs、zookeeper、kafka、flume
2)启动weblog-shell.sh脚本
3)创建名为weblogs的topic
bin/kafka-topics.sh --create --zookeeper node1:2181,node2:2181,node3:2181 --topic weblogs --partitions 1 --replication-factor 3
4)启动node2、node3的脚本发送数据到node1
5)启动node1的脚本接收node2、node3的数据,发送到hbase和kafka
10.安装mysql
11.安装hive(启动hive前需先启动yarn,因为mapreduce需在yarn上运行)
1)启动:bin/hive
2)测试加载数据到hive:
load data local inpath '/opt/data/test.txt' into table test;
3)根据业务需求创建表结构
CREATE EXTERNAL TABLE weblogs(
id string,
datetime string,
userid string,
searchname string,
retorder string,
cliorder string,
cliurl string
)
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES("hbase.columns.mapping"=
":key,info:datetime,info:userid,info:searchname,info:retorder,info:cliorder,info:cliurl")
TBLPROPERTIES("hbase.table.name"="weblogs");
12.Hive与Hbase集成
1)第一种方式,比较麻烦,将hbase下配置文件拷贝到hive/conf下
2)第二种方式
a)在hive-site.xml中配置
<property>
<name>hbase.zookeeper.quorum</name>
<value>node1,node2,node3</value>
</property>
b)将hbase的9个jar拷贝到hive/lib下(high-scale-lib-1.1.2.jar自己下载)
export HBASE_HOME=/opt/soft/hbase
export HIVE_LIB=/opt/soft/hive-1.2.1-bin
ln -s $HBASE_HOME/lib/hbase-server-1.1.3.jar $HIVE_LIB/lib/hbase-server-1.1.3.jar
ln -s $HBASE_HOME/lib/hbase-client-1.1.3.jar $HIVE_LIB/lib/hbase-client-1.1.3.jar
ln -s $HBASE_HOME/lib/hbase-protocol-1.1.3.jar $HIVE_LIB/lib/hbase-protocol-1.1.3.jar
ln -s $HBASE_HOME/lib/hbase-it-1.1.3.jar $HIVE_LIB/lib/hbase-it-1.1.3.jar
ln -s $HBASE_HOME/lib/htrace-core-3.1.0-incubating.jar $HIVE_LIB/lib/htrace-core-3.1.0-incubating.jar
ln -s $HBASE_HOME/lib/hbase-hadoop2-compat-1.1.3.jar $HIVE_LIB/lib/hbase-hadoop2-compat-1.1.3.jar
ln -s $HBASE_HOME/lib/hbase-hadoop-compat-1.1.3.jar $HIVE_LIB/lib/hbase-hadoop-compat-1.1.3.jar
ln -s $HBASE_HOME/lib/high-scale-lib-1.1.2.jar $HIVE_LIB/lib/high-scale-lib-1.1.2.jar
ln -s $HBASE_HOME/lib/hbase-common-1.1.3.jar $HIVE_LIB/lib/hbase-common-1.1.3.jar
13.Hue安装部署
1)下载
2)编译Hue
a)安装需要依赖的包(下面的包可能多了几个)
yum install ant asciidoc cyrus-sasl-devel cyrus-sasl-gssapi cyrus-sasl-plain gcc gcc-c++ krb5-devel libtidy libffi-devel libxml2-devel libxslt-devel make mysql mysql-devel openldap-devel python-devel sqlite-devel gmp-devel openssl-devel mysql-devel
b)hue文件中:make apps
3)配置(vi $HUE_HOME/desktop/conf/hue.ini)
secret_key=jFE93j;2[290-eiw.KEiwN2s3['d;/.q[eIW^y#e=+Iei*@Mn < qW5o
http_host=node3
http_port=8888
time_zone=Asia/Shanghai
4)设置desktop.db的权限
[root@node3 desktop]# chmod o+w desktop.db
5)启动服务
[root@node3 hue-4.0.0]# ./build/env/bin/supervisor
如果出现错误KeyError: "Couldn't get user id for user hue"
如下:adduser hue,并将desktop.db改为hue:hue下,不要在root下
chown -R hue:hue desktop.db
6)Hue与Hive集成(hue.ini)
fs_defaultfs=hdfs://node1:8020 // hdfs默认路径
webhdfs_url=http://node1:50070/webhdfs/v1
hadoop_conf_dir=/opt/soft/hadoop-2.6.4/etc/hadoop
hadoop_bin=/opt/soft/hadoop-2.6.4/bin
hadoop_hdfs_home=/opt/soft/hadoop-2.6.4
------------------------------------------------------------
// 在三台hadoop中的core-site.xml中添加内容:
<property>
<name>hadoop.proxyuser.hue.hosts</name>
<value>*</value>
</property>
<property>
<name>hadoop.proxyuser.hue.groups</name>
<value>*</value>
</property>
------------------------------------------------------------
启动hdfs:
start-dfs.sh
------------------------------------------------------------
访问url:
http://node3:8888/filebrowser/
7)Hue与Yarn集成(hue.ini)
resourcemanager_host=zxl2
resourcemanager_port=8032
resourcemanager_api_url=http://node1:8088
proxy_api_url=http://node1:8088
history_server_api_url=http://node1:19888
------------------------------------------------------------
启动yarn:
start-yarn.sh
8)Hue与Hive集成(Hue.ini)
hive_server_host=node3
hive_server_port=10000
hive_conf_dir=/opt/soft/hive-1.2.1-bin/conf
------------------------------------------------------------
启动hive
[root@node3 bin]# ./hive --service hiveserver2
9)Hue与Mysql集成(Hue.ini)
nice_name="My SQL DB" // 随意配置
name=metastore // 数据库名
engine=mysql
host=node3
port=3306
user=root
password=1234
注意:[[[mysql]]]前的##要删掉
10)Hue与Hbase集成(Hue.ini)
hbase_clusters=(Cluster|node1:9090) // 随意配置集群中某一台hbase
hbase_conf_dir=/opt/soft/hbase/conf
------------------------------------------------------------
启动hbase(thrift)
[root@node1 hbase]# bin/start-hbase.sh
[root@node1 hbase]# bin/hbase-daemon.sh start thrift // 启动一个就行
14.配置Spark集群模式,因为受内存的影响,配置为standlone模式
15.配置Spark SQL与Hive集成(此次在安装Hive的服务器里的spark配置)
1)将hive的配置文件hive-site.xml拷贝到spark conf目录,同时添加metastore的url配置
vi hive-site.xml
<property>
<name>hive.metastore.uris</name>
<value>thrift://node3:9083</value>
</property>
2)拷贝hive中的mysql jar包到spark的jar目录下
cp hive-1.2.1-bin/lib/mysql-connector-java-5.1.35-bin.jar spark-2.2.0/jars/
3)检查spark-env.sh 文件中的配置项
vi spark-env.sh
HADOOP_CONF_DIR=/opt/soft/hadoop-2.6.4/etc/hadoop
4)启动mysql
service mysqld start
5)启动hive metastore服务
bin/hive --service metastore
6)启动hive并测试下
bin/hive
show databases;
create database zxl;
use zxl;
create table if not exists test(userid string,username string)ROW FORMAT DELIMITED FIELDS TERMINATED BY ' ' STORED AS textfile;
load data local inpath "/opt/data/test.txt" into table test;
more /opt/data/test.txt
0001 spark
0002 hive
0003 hbase
0004 hadoop
7)启动spark-shell
bin/spark-shell
spark.sql("select * from zxl.test").show
8)展示启动spark-sql
bin/spark-sql
show databases; #查看数据库
default
zxl
use zxl; #使用数据库
show tables; #查看表
test
select * from test; #查看表数据
9)Spark SQL之ThriftServer和beeline使用
a)启动ThriftServer
sbin/start-thriftserver.sh
b)启动beeline
bin/beeline !connect jdbc:hive2://node3:10000
show databases; #查看数据库
select * from kfk.test; #查看表数据
16.配置Spark SQL与MySQL集成(spark1为test数据库中的表)
启动spark-shell
bin/spark-shell
:paste #可以多行输入,包括注释,需顶格书写
val jdbcDF = spark.read
.format("jdbc")
.option("url", "jdbc:mysql://node3:3306/test")
.option("dbtable", "spark1")
.option("user", "root")
.option("password", 1234)
.load()
ctr+d #退出
#打印读取数据
jdbcDF.show
17.配置Spark SQL与Hbase集成
Spark SQL与HBase集成,其核心就是Spark Sql通过hive外部表来获取HBase的表数据。
1)拷贝HBase的包和hive包到spark 的jars目录下
hbase-client-1.1.3.jar
hbase-common-1.1.3.jar
hbase-protocol-1.1.3.jar
hbase-server-1.1.3.jar
hive-hbase-handler-1.2.1.jar
htrace-core-3.1.0-incubating.jar #incubating表示刚出现版本
mysql-connector-java-5.1.35-bin.jar
2)启动spark-shell
bin/spark-shell
val df =spark.sql("select count(1) from weblogs").show
18.安装nc作为外部数据源
yum -y install nc 或者 rpm安装(rpm -ivh nc-1.84-24.el6.x86_64.rpm)
19.简单运行nc与spark例子
[root@node2 ~]# nc -lk 9999
[root@node2 spark-2.2.0]# bin/run-example --master local[2] streaming.NetworkWordCount localhost 9999
注:记得设置master时,local[n],n的值一定要大于worker的个数
20.Spark Streaming结果数据保存到外部数据库(mysql)
// 一般与数据库建立连接时,使用foreachPartition来避免频繁创建数据库连接
Class.forName("com.mysql.jdbc.Driver")
val conn = DriverManager
.getConnection("jdbc:mysql://node3:3306/test","root","1234")
try{
for(row <- line){
val sql = "insert into webCount(titleName,count)values('"+row._1+"',"+row._2+")"
conn.prepareStatement(sql).executeUpdate()
}
}finally {
conn.close()
}
21.StructuredStreaming与kafka、mysql集成
添加spark一些jar,spark+kfk和spark+hbase
22.创建表webCount用来接收数据
CREATE TABLE `webCount` (
`titleName` varchar(255) DEFAULT NULL,
`count` int(11) DEFAULT NULL
) ENGINE=MyISAM DEFAULT CHARSET=utf8;
23.简单流程测试:
1)启动zookeeper:zkServer.sh start
2)启动dfs:start-dfs.sh
3)启动hbase:start-hbase.sh
4)启动mysql;service mysqld start
5)node2(node3)启动flume:flume-kfk-start.sh
6)node1启动flume:flume-kfk-start.sh
7)启动kafka-0.10(最好三台都启动,不然易出错):
bin/kafka-server-start.sh config/server.properties > kafka.log 2>&1 &
8)启动node2(node3)中的脚本:weblog-shell.sh
9)启动应用程序
10)解决Structured Streaming向数据库写入乱码
1)修改数据库文件my.cnf(linux下)
vi my.cnf
-----------------------------------------------------------------------------
[client]
socket=/var/lib/mysql/mysql.sock //添加
default-character-set=utf8 //添加
[mysqld]
character-set-server=utf8 //添加
datadir=/var/lib/mysql
socket=/var/lib/mysql/mysql.sock
user=mysql
# Disabling symbolic-links is recommended to prevent assorted security risks
symbolic-links=0
[mysqld_safe]
log-error=/var/log/mysqld.log
pid-file=/var/run/mysqld/mysqld.pid
-----------------------------------------------------------------------------
2)建表时形如下:
CREATE TABLE `webCount` (
`titleName` varchar(255) CHARACTER SET utf8 DEFAULT NULL,
`count` int(11) DEFAULT NULL
) ENGINE=InnoDB DEFAULT CHARSET=utf8;