Hive

安装mysql:
https://www.cyberithub.com/install-mysql/

添加’root’@’%’账号
https://linuxize.com/post/how-to-create-mysql-user-accounts-and-grant-privileges/

下载mysql-connector/J
maven repository: https://mvnrepository.com/artifact/mysql/mysql-connector-java/8.0.26

hive-to-mysql:
https://data-flair.training/blogs/configure-hive-metastore-to-mysql/

Supported Backend Databases for Metastore:
https://cwiki.apache.org/confluence/display/Hive/AdminManual+Metastore+Administration

安装hive:
https://cwiki.apache.org/confluence/display/Hive/GettingStarted#GettingStarted-InstallationandConfiguration

dbeaver连接hive报错
Required field ‘serverProtocolVersion’ is unset! Struct:TOpenSessionResp(status:TStatus(statusCode:ERROR_STATUS, infoMessages:[*org.apache.hive.service.cli.HiveSQLException:Failed to open new session: java.lang.RuntimeException: org.apache.hadoop.ipc.RemoteException(org.apache.hadoop.security.authorize.AuthorizationException): User: root is not allowed to impersonate root:14:13,

https://www.daimajiaoliu.com/daima/4edfede79900418
https://stackoverflow.com/questions/25073792/error-e0902-exception-occured-user-root-is-not-allowed-to-impersonate-root

确认hadoop集群重启过。

https://henning.kropponline.de/2014/06/17/getting-started-orc-hcatalog-2/

ORCFile in HDP 2: Better Compression, Better Performance
https://blog.cloudera.com/orcfile-in-hdp-2-better-compression-better-performance/

1,USA,Washington,328
2,France,Paris,67
3,Spain,Madrid,47
4,Russia,Moscow,145
5,Indonesia,Jakarta,267
6,Nigeria,Abuja,196
7,China,北京,99

create external table if not exists countries_list(
id int,
name string,
capital string,
population int
)
comment ‘List of Countries’
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ‘,’
STORED AS TEXTFILE
LOCATION ‘/test/countries’;

“select hiveKey,hiveValue from “+hcatDatabase+”.hiveMeta where hiveKey like ‘“ + db + “|%’”;
hiveMeta表怎么来的?
记录每种日志扩展字段对应hive表字段的index。

HiveSink4OrcWriter writer = new HiveSink4OrcWriter(fs, dataLocation, fileHash, pkey, writeOptions);
先写临时文件,然后提交

writer.close(null);
writer.commit();

updateOffset(positionMap); 记录每个分区消费到了哪里

OrcFile.createWriter(tempLocation, options);
writeOptions

session.timeout.ms
heartbeat.interval.ms

max.poll.interval.ms 参数。它限定了 Consumer 端应用程序两次调用 poll 方法的最大时间间隔。它的默认值是 5 分钟,表示你的 Consumer 程序如果在 5 分钟之内无法消费完 poll 方法返回的消息,那么 Consumer 会主动发起“离开组”的请求,Coordinator 也会开启新一轮 Rebalance。

第一类非必要 Rebalance 是因为未能及时发送心跳,导致 Consumer 被“踢出”Group 而引发的。
第二类非必要 Rebalance 是 Consumer 消费时间过长导致的。
Consumer 端的 GC 表现

E:\IdeaProjects\test\rebalance\serverlog

ssh hdp007.jg.6ght.com
find /usr/hdp/2.6.2.0-205/kafka/logs/ -type f -regex “.?/server.log.2021-0[89]-.“ -print0 | xargs -0 grep “Preparing to restabilize group hdfs”

find /usr/hdp/3.1.4.0-315/kafka/logs/ -type f -regex “.?/server.log.2021-0[89]-.“ -print0 | xargs -0 grep “Preparing to rebalance group”

consumerAndRecords.get().poll(); timeout=2m
拉取数据2分钟,拉取数据1000000,写入数据要多久?5分钟?如果这么慢,早挂了
For consumers using a non-null group.instance.id which reach this timeout, partitions will not be immediately reassigned. Instead, the consumer will stop sending heartbeats and partitions will be reassigned after expiration of session.timeout.ms
5分钟之后还可以再消费数据吗?
session.timeout.ms太大会不会有问题

hdfs数据去重
hive sql group 分组,找出有重复的数据
创建新的hive表,把distinct数插进去
备份源数据,把新数据通过hdfs dfs -mv覆盖过去

logRootDir = otherArgs[0]; 合并程序的根目录
总结下reduce的过程

1.ChannelProcessor.processEventBatch()批量处理事件
2.KafkaTransaction.doCommit()发送所有事件,然后等待所有事件发送成功
3.发送事件的方法为KafkaProducer.send(),逻辑是加入队列RecordAccumulator,等待Sender线程发送。
4.Sender线程将发送中的请求加入队列InFlightRequests,等待完成。
5.Flume正常关闭时,要求队列RecordAccumulator和队列InFlightRequests都为空,已读取的事件全部处理完,KafkaProducer.close()
6.Flume关闭时收到interrupt信号,调用KafkaProducer.forceClose(),忽略还处于队列的事件,但是file position会记录,此时出现数据丢失
7.Flume强制关闭,file position记录失败,出现数据重复。

通过hivesql写入hdfs,而不是orcfile api,降低失败率

HiveSink

DEFAULT_BATCHSIZE = 15000;
1000条一个批次,写入hive

kafka
DEFAULT_POLL_TIMEOUT = 500;
consumer.poll(Duration.ofMillis(pollTimeout)); 一次拉取多少条数据或者多少bytes数据?

HiveSink
for (; txnEventCount < batchSize; ++txnEventCount) {
// 0) Read event from Channel
Event event = channel.take();
writer.write(event);
}
边拉数据边写hive

Kafka

worker

connector.taskClass().getName();
connector.taskClass=MirrorSourceTask

MirrorConnectorConfig
Map sourceConsumerConfig()
默认用了什么消费者组?

Remote Cluster Utils
A utility class RemoteClusterUtils will leverage the internal topics described above to assist in computing reachability, inter-cluster lag, and offset translation. It won’t be possible to directly translate any given offset, since not all offsets will be captured in the checkpoint stream. But for a given consumer group, it will be possible to find high water marks that consumers can seek() to. This is useful for inter-cluster consumer migration, failover, etc.

Compatibility, Deprecation, and Migration Plan
A new version of ./bin/kafka-mirror-maker.sh will be implemented to run MM2 in “legacy mode”,

MirrorMaker Clusters
The MirrorMaker.java driver class and ./bin/connect-mirror-maker.sh script implement a distributed MM2 cluster which does not depend on an existing Connect cluster. Instead, MM2 cluster nodes manage Connect workers internally based on a high-level configuration file.

  1. 没有master,mirror maker cluster如何工作?
  2. worker节点容灾的处理
  3. 主题和分区的分配
  4. 消费者组?

读完理论篇,然后测试下

prerequisite : a set of Kafka brokers

because Kafka Connect stores connector configurations, status, and offset information inside the Kafka cluster where it is safely replicated, losing the node where a Connect worker runs does not result in any lost data.

Distributed Workers that are configured with matching group.id values automatically discover each other and form a Kafka Connect cluster. All Workers in the cluster use the same three internal topics to share connector configurations, offset data, and status updates.

ConnectProtocol.Assignment

MirrorSourceTask

Work.startTask()

WorkerSourceTask

commitTaskRecord(preTransformRecord, recordMetadata);
每发一条消息就需要写一次offset,mm1是发送一批再写offset。

[2021-10-22 16:53:24,505] INFO AdminClientConfig values:
bootstrap.servers = [hdp2:9092]

bin/kafka-producer-perf-test.sh
—topic aa \
—throughput -1
—num-records 3000000
—record-size 1024
—producer-props acks=all bootstrap.servers=localhost:9092

[2021-10-22 18:03:45,174] WARN Could not create topic-partitions for hdp2.aa. (org.apache.kafka.connect.mirror.MirrorSourceConnector:383)
org.apache.kafka.common.errors.UnsupportedVersionException: The broker does not support CREATE_PARTITIONS

去掉创建topic的特性
测试