端口占用问题 https://jingyan.baidu.com/article/20b68a8880c84d796dec6256.html
1、源数据准备-100%
1.关于json格式的取值
解析json:
hive上:
1.取出单个值 select get_json_object(‘{“beForwardWeiboId”:””,”catchTime”:”1387158370”,”commentCount”:”211”}’,’$.catchTime’);
2.取多个值 select json_tuple(‘{“beForwardWeiboId”:””,”catchTime”:”1387158370”,”commentCount”:”211”}’,’catchTime’,’commentCount’);
3.json数组 取值
select json_tuple(get_json_object(‘[{“beForwardWeiboId”:””,”catchTime”:”1387158370”,”commentCount”:”211”}]’,’$[0]’),’catchTime’,’commentCount’);
参考:https://www.cnblogs.com/yfb918/p/10644262.html
2、数据有序化落地处理-hive order by-100%
定义数据库和表
db_name=yzj table_name=weibo_product
定义落地后的存储目录,包括hdfs和本地
save_data_dir_hdfs=”/user/yzj/project/sorted_weibo_product/“ save_data_dir_local=”../data/sorted_weibo_product/“
业务代码实现数据落地
hive -e “ use $db_name; insert overwrite directory ‘$save_data_dir_hdfs’ row format delimited fields terminated by ‘\001’ select * from $table_name where day_seq=20120103 and created_at is not null and length(created_at)=19 order by created_at asc; “
完成从hdfs处,下载数据到本地
hdfs dfs -get -f $save_data_dir_hdfs/000000_0 $save_data_dir_local
注意其中的sql语句:select * from $table_name where day_seq=20120103 and created_at is not null and length(created_at)=19 order by created_at asc; <br />如果不加length=19则不能正常排序<br />
测试:
```java
select created_at from weibo_product where day_seq=20120103 and created_at is not null
and length(created_at)=19 order by created_at asc limit 100;
3、数据接收服务-springboot restful service
1.maven构建springboot项目,加上restful api接口即可。
创建一个微服务接口
@RestController
@EnableAutoConfiguration
public class HelloWorldService {
@RequestMapping("/hello")
public String hello(){
return "HelloWorld,世界你好";
}
public static void main(String[] args){
SpringApplication.run(HelloWorldService.class,args);
}
}
设置启动类
@SpringBootApplication
public class BootApplication {
public static void main(String[] args) {
SpringApplication.run(BootApplication.class,args);
}
}
设置application.properties
#webserver 配置
server.port=8085
server.contextPath=/tianliangedu
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.tl.job016.bigdata</groupId>
<artifactId>DataReceiverAndSendService</artifactId>
<version>1.0-SNAPSHOT</version>
<!-- 属性配置 -->
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<java_source_version>1.8</java_source_version>
<java_target_version>1.8</java_target_version>
<file_encoding>utf-8</file_encoding>
</properties>
<!-- 首先配置仓库的服务器位置,首选阿里云,也可以配置镜像方式,效果雷同 -->
<repositories>
<repository>
<id>nexus-aliyun</id>
<name>Nexus aliyun</name>
<url>http://maven.aliyun.com/nexus/content/groups/public</url>
</repository>
<repository>
<id>spring-milestone</id>
<url>http://repo.spring.io/libs-release</url>
</repository>
</repositories>
<!-- Spring boot 父引用 -->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.4.0.RELEASE</version>
</parent>
<dependencies>
<!-- Spring boot 核心web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
<!-- 打成可执行的jar,并指定主启动类 -->
<build>
<plugins>
<!-- all in one打包-->
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<fork>true</fork> <!-- 如果没有该配置,devtools不会生效 -->
<mainClass>com.tl.job016.bigdata.BootApplication</mainClass>
</configuration>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
<!-- <plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<mainClass>com.tianliangedu.boot.ApplicationBootControler</mainClass>
</configuration>
</plugin> -->
</plugins>
</build>
</project>
2.打个jar包部署到Linux测试看是否能运行
新建RealTimeDataService 然后 vi start.sh
#! /bin/bash
#将jar包正式启动 对外提供服务
java -jar DataReceiverAndSendService-1.0-SNAPSHOT.jar
运行start.sh 端口号被占用 所以改为8080 改了端口号删掉target重新编译才能顺利重启端口号
重新打包上传启动start.sh
正常运行 然后新窗口测试链接
[yzj@cluster3 ~]$ curl localhost:8080/tianliangedu/hello
可以输出helloworld世界你好 初步部署成功
怎么在本地访问linux的端口?
命令 ifconfig
然后用网站代理访问:
http://172.17.0.10:8080/tianliangedu/hello
测试完成
3.部署Linux接收数据
创建类DataReceiverService
@RestController
public class DataReceiverService {
@RequestMapping("/receiver")
public boolean receiver(String message){
System.out.println(message);
return true;
}
}
测试
至此可以接收数据
修改依赖 分离式打包
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.tl.job016.bigdata</groupId>
<artifactId>DataReceiverAndSendService</artifactId>
<version>1.0-SNAPSHOT</version>
<!-- 属性配置 -->
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
<java_source_version>1.8</java_source_version>
<java_target_version>1.8</java_target_version>
<file_encoding>utf-8</file_encoding>
</properties>
<!-- 首先配置仓库的服务器位置,首选阿里云,也可以配置镜像方式,效果雷同 -->
<repositories>
<repository>
<id>nexus-aliyun</id>
<name>Nexus aliyun</name>
<url>http://maven.aliyun.com/nexus/content/groups/public</url>
</repository>
<repository>
<id>spring-milestone</id>
<url>http://repo.spring.io/libs-release</url>
</repository>
</repositories>
<!-- Spring boot 父引用 -->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.4.0.RELEASE</version>
</parent>
<dependencies>
<!-- Spring boot 核心web -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
<!-- 打成可执行的jar,并指定主启动类 -->
<build>
<plugins>
<!-- all in one打包-->
<!--<plugin>-->
<!--<groupId>org.springframework.boot</groupId>-->
<!--<artifactId>spring-boot-maven-plugin</artifactId>-->
<!--<configuration>-->
<!--<fork>true</fork> <!– 如果没有该配置,devtools不会生效 –>-->
<!--<mainClass>com.tl.job016.bigdata.BootApplication</mainClass>-->
<!--</configuration>-->
<!--<executions>-->
<!--<execution>-->
<!--<goals>-->
<!--<goal>repackage</goal>-->
<!--</goals>-->
<!--</execution>-->
<!--</executions>-->
<!--</plugin>-->
<!-- <plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<configuration>
<mainClass>com.tianliangedu.boot.ApplicationBootControler</mainClass>
</configuration>
</plugin> -->
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>1.7</source>
<target>1.7</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-resources-plugin</artifactId>
<!-- 解决资源文件的编码问题 -->
<configuration>
<encoding>UTF-8</encoding>
</configuration>
<executions>
<execution>
<id>copy-resources</id>
<phase>validate</phase>
<goals>
<goal>copy-resources</goal>
</goals>
<configuration>
<!-- 把资源文件打包到工程目录下,与下面的打包不包含资源文件对应,也可以是其它目录,比方conf,这样写${project.build.directory}/conf -->
<outputDirectory>${project.build.directory}</outputDirectory>
<resources>
<resource>
<directory>src/main/resources</directory>
<filtering>true</filtering>
</resource>
</resources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-jar-plugin</artifactId>
<configuration>
<archive>
<manifest>
<addClasspath>true</addClasspath>
<classpathPrefix>lib/</classpathPrefix>
<mainClass>com.tl.job016.bigdata.BootApplication</mainClass><!--
main函数入口 -->
<addDefaultImplementationEntries>true</addDefaultImplementationEntries>
<addDefaultSpecificationEntries>true</addDefaultSpecificationEntries>
</manifest>
<manifestEntries>
<!--<Permissions>${Permissions}</Permissions>-->
<!--<Caller-Allowable-Codebase>${Caller-Allowable-Codebase}</Caller-Allowable-Codebase>-->
<Class-Path>./</Class-Path>
</manifestEntries>
</archive>
</configuration>
</plugin>
<!-- 解决资源文件的编码问题 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>2.3.2</version>
<configuration>
<source>${java_source_version}</source>
<target>${java_target_version}</target>
<encoding>${file_encoding}</encoding>
<showDeprecation>true</showDeprecation>
<showWarnings>true</showWarnings>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
<executions>
<execution>
<id>copy-dependencies</id>
<phase>package</phase>
<configuration>
<overWriteReleases>false</overWriteReleases>
<overWriteSnapshots>false</overWriteSnapshots>
<overWriteIfNewer>true</overWriteIfNewer>
<!-- 将引用依赖的jar包打在lib目录下 -->
<outputDirectory>
${project.build.directory}/lib
</outputDirectory>
</configuration>
<goals>
<goal>copy-dependencies</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
先测试一下将复制到同一文件夹用sbt测试是否能运行.
测试成功
将三个文件上传到Linux lib可以先打包再解压
然后执行start.sh
再到另一个窗口测试
curl localhost:8080/tianliangedu/hello
curl localhost:8080/tianliangedu/receiver?message=123
原窗口读取到信息 测试成功
4.模拟爬虫读取有序的文件数据, 将数据以逐条的方式发送到数据接收服务。
编辑send_data_to_data_receiver.sh 测试
#! /bin/bash
#定义数据源在哪里
weibo_data_filepath=../data/sorted_weibo_product/000000_0
#定义接收数据源的服务在哪里
receiver_service=http://localhost:8080/tianliangedu/receiver
#写shell脚本, 模拟采集数据完成后的传递数据到接口服务的过程,核心是shell编程+curl
#while 代表有数据就读 line随意命名
cat $weibo_data_filepath | head -10 | while read line;
do
#echo $line
curl $receiver_service?message=中国
done
正式版
#! /bin/bash
#定义数据源在哪里
weibo_data_filepath=../data/sorted_weibo_product/000000_0
#定义接收数据源的服务在哪里
receiver_service=http://localhost:8080/tianliangedu/receiver
#写shell脚本, 模拟采集数据完成后的传递数据到接口服务的过程,核心是shell编程+curl
#while 代表有数据就读 line随意命名
cat $weibo_data_filepath | head -100000 | tail -1000 | while read line;
do
#echo $line 全部读出来
#curl $receiver_service?message=中国
#curl $receiver_service?message=$line 不能这样发送 里面杂乱信息太多 特殊符号等 只适合传送简单参数
curl -X POST --data-urlencode "message=$line" $receiver_service
#sleep 1s
done
4、数据转发到kafka集群-kafka producer
新建kafka包 将kafka生产者消费者两个类放进去
生产者KafkaProducerUtil
import java.util.Properties;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
public class KafkaProducerUtil {
// 生产者抽象对象
public KafkaProducer<String, String> producer;
// 传入brokerList,以hostname:port的方式,多个之间用,号隔开
public KafkaProducerUtil(String brokerList) {
Properties props = new Properties();
// 服务器ip:端口号,集群用逗号分隔
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
// key序列化指定类
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
// value序列化指定类
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
StringSerializer.class.getName());
// 生产者对象
producer = new KafkaProducer<String, String>(props);
}
public void close() {
this.producer.close();
}
}
写一个服务类DataSendService 在这里初始化生产者 用kafka producer实现向kafka发送数据的功能
import com.tl.job016.bigdata.kafka.KafkaProducerUtil;
import org.apache.kafka.clients.producer.ProducerRecord;
public class DataSendService {
public static KafkaProducerUtil kafkaProducerUtil = null;
public static String topic = null;
public static String brokerList = null;
static {
//初始化broker列表
brokerList = "cluster1.hadoop:6667,cluster0.hadoop:6667";
topic = "YzjKafkaTest";
//初始化生产者工具类
kafkaProducerUtil = new KafkaProducerUtil(brokerList);
}
//用kafka producer实现向kafka发送数据的功能
public static boolean sendMessage(String message) {
try {
kafkaProducerUtil.producer.send(new ProducerRecord<String, String>(
topic, message));
return true;
} catch (Exception e) {
return false;
}
}
}
打包上传
- sh start.sh运行springboot项目
- sh send_data_to_data_receiver.sh 将数据传输到项目 再有项目中的接口转发到kafka 先转发10条测试
- 打开消费者 查看书否传输到kafka集群
sh /usr/hdp/3.1.0.0-78/kafka/bin/kafka-console-consumer.sh —bootstrap-server cluster0.hadoop:6667 -topic YzjKafkaTest -from-beginning
成功
从网页代理端测试也没问题
5、kafka集群搭建(done)
6、实时数据的消费落地-小时级数据落地-kafka consumer
测试consumer_data_to_dw_sdf.sh
#! /bin/bash
#定义要消费的topic参数
topic=YzjKafkaTest
#定义相关的kafka-consumer参数
bootstrap_server=cluster0.hadoop:6667
#业务代码实现数据的消费落地
sh /usr/hdp/3.1.0.0-78/kafka/bin/kafka-console-consumer.sh --bootstrap-server $bootstrap_server -topic $topic -from-beginning >weibo_data.log
会在本地生成一个weibo_data.log文件
more weibo_data.log查看可以将kafka中的数据落地到本地 测试成功
shell字符串截取 https://blog.csdn.net/YF_Li123/article/details/89202287
从左边第N个字符开始,截取M个字符
echo ${var:8:3}www
注释:“:8:3”表从左边第9个字符开始,向右截取3个字符
准备做小时级的实时数据
写消费者数据落地shell consumer_data_to_dw_sdf.sh
#! /bin/bash
#定义要消费的topic参数
topic=YzjKafkaTest
#定义相关的kafka-consumer参数
bootstrap_server=cluster0.hadoop:6667
#业务代码实现数据的消费落地
sh /usr/hdp/3.1.0.0-78/kafka/bin/kafka-console-consumer.sh --bootstrap-server $bootstrap_server -topic $topic -from-beginning | while read doc;
do
#echo $doc
created_at=`echo $doc | awk -F '\001' '{print $9}'`
real_time_value=${created_at:0:4}${created_at:5:2}${created_at:8:2}${created_at:11:2}
#echo $real_time_value >覆盖式重定向 >>增量式重定向
echo $doc >> $real_time_value.log
done
nohup sh consumer_data_to_dw_sdf.sh &
让他永久后台执行
修改send_data_to_data_receiver.sh参数 运行
就能按小时数据生成log文件