zookeeper在实际应用中,其有一个重要功能,就是实现master选举。本文主要介绍应用服务器集群时,如何使用zookeeper来实现服务器选主。

应用场景:

经常会有这样的需求,服务器集群时,一个主服务器用于分发任务,其他的从服务器用于执行任务。当主服务器挂掉时,其中一个从节点需要成为主节点,并继续提供服务,当挂掉的节点恢复时,他能自动加入到从节点中,继续服务。

zookeeper介绍

官方文档上这么解释zookeeper,它是一个分布式服务框架,是Apache Hadoop 的一个子项目,它主要是用来解决分布式应用中经常遇到的一些数据管理问题,如:统一命名服务、状态同步服务、集群管理、分布式应用配置项的管理等。

数据结构

zk维护的是一种类似标准文件系统的数据结构。

如下图:

zk数据结构

图中的每一个结点都是一个znode,特殊的是znode是可以存储数据的。同时也和文件系统一样,我们可以增加或删除节点,在节点下增加或删除子节点。

对于znode来说,按存活时间来说,zk提供了如下几种类型:

  1. 持久节点

    持久节点的存活是不依赖于客户端是否断开,只有客户端显示去删除才能清理

  2. 临时节点

    临时节点会在客户端断开时,自动删除

注:节点在创建时就已经确定了其具体类型,在其生命周期内,是不能做相应的类型变更。

另外zk在上述节点基础上,提供一种节点特性,即普通节点/顺序节点

  1. 普通节点

    也就是节点具体其原始属性(比如:持久/临时性)

  2. 顺序节点

    节点在原有特性(持久/临时)基础上,增加了顺序特性。

    创建节点时,若类型含顺序特性,则在节点生成时,zk会自动的在路径后面追加一个有序的数字,且该数字是其父节点下唯一。

    例:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    #创建顺序节点命令
    create -s /app/info1 app

    #输出:
    Created /app/info10000000001

    #这样就创建了一个持久顺序节点 /app/info10000000001。
    #如果再执行此命令,则会生成节点 /app/info10000000002。
    #如果在create -s再添加-e参数,则可以创建一个临时顺序节点。
watcher机制

这里只简要说明下,在创建节点时,可以在节点上订阅事件。节点发生变化,就会触发该事件,比如节点的删除、变更等。

在选主应用上,主要对节点删除感兴趣,当监听到节点删除时,自动触发选主操作。

zk选主应用

选主所具备的功能:

  1. 所有节点竞争时,只有一个节点会成功成为主节点,失败的节点自动成为从节点
  2. 主节点失效时,会触发新一轮的选举
  3. 从节点可以获取主节点信息,主节点可知道所有从节点的信息

整体逻辑如下

选主逻辑图

选主过程

  1. 初始化时,即系统启动,都向/leader_info注册临时节点,注册成功的即为主节点,失败的为从节点
  2. 从节点从/leader_info中获取主节点信息,并在该节点上添加watcher监听(删除事件监听)

从节点监听到删除事件,从节点触发选主操作。

我们如何能获取到其他节点的信息呢,比如:主节点需知道有哪些从节点,从节点需知道主节点是哪个。

服务启动时,我们向/servers下注册临时子节点,该节点保存有当前服务的ip,端口等信息。各个节点通过获取/servers下的子节点集合,就可以获取到各节点的信息。

可以通过获取/leader_info信息,来获取主节点的信息。

本例采用的是springboot + zk

代码如下:

zk连接类:ZkConfig.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
package com.example.zk.test;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ZkConfig {

@Value("${zk.host}")
private String connectString;

@Value("${zk.timeout}")
private int timeout;

@Bean(name = "zkClient")
public ZooKeeper getZk() {
ZooKeeper zooKeeper = null;
try {
//连接成功后,会回调watcher监听,此连接操作是异步的,执行完new语句后,直接调用后续代码
// 可指定多台服务地址 127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
zooKeeper = new ZooKeeper(connectString, timeout, new Watcher() {
@Override
public void process(WatchedEvent event) {
}
});
} catch (Exception e) {
}
return zooKeeper;
}
}

zk选主服务: RaceLeaderService.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
package com.example.zk.test;

import ch.qos.logback.core.hook.ShutdownHookBase;
import org.apache.zookeeper.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import org.springframework.util.CollectionUtils;

import java.io.UnsupportedEncodingException;
import java.util.List;

/**
* 竞争leader类
* 继承shutdownHook,关服务时,触发断开zk连接
*/
@Service
public class RaceLeaderService extends ShutdownHookBase {

@Autowired
private ZooKeeper zooKeeper;

private Status status;

//主节点(临时节点),选主时竞争的节点
private static String LEADER_INFO = "/leader_info";

//服务信息父节点(持久节点),其子节点记录有各服务信息
private static String SERVER_INFO_PRREFIX = "/servers";

//示例,使用端口作为不同服务器的区分,
//本例在同一台机器上,分三个端口部署同样的服务
@Value("${server.port}")
private String port;

/**
* 首次启动,需向zk注册持久节点/servers,
* 因为保存有服务器信息的临时节点的父节点为/servers,
* 而临时节点的父节点必须为持久节点(zk特性)
*/
public void init() {
try {
zooKeeper.create(SERVER_INFO_PRREFIX, port.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

/**
* 向/servers下注册子节点,以保存当前服务器的连接信息
*/
public void register() {

try {
zooKeeper.create(SERVER_INFO_PRREFIX + "/" + port, port.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);

} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}

}

/**
* 通过/servers获取其子节点信息,进而获取所有存活节点的信息
*/
public void getServers() {

List<String> children = null;
try {
children = zooKeeper.getChildren(SERVER_INFO_PRREFIX, true);
} catch (KeeperException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}

if (CollectionUtils.isEmpty(children)) return;

for (String str : children) {
if (status == Status.LEADER && str.equals(port)) {
System.out.println("leader:" + str);
} else {
System.out.println("follower:" + str);
}
}
}

/**
* 选主方法
*/
public void lookingForLeader() {
status = Status.LOOKING;
try {
String leaderInfo = port;
// 需要注意这里创建的是临时节点
zooKeeper.create(LEADER_INFO, leaderInfo.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL);
// 如果上一步没有抛异常,说明自己已经是leader了
status = Status.LEADER;
String logMsg = port + " is leader";
System.out.println(logMsg);
} catch (KeeperException.NodeExistsException e) {
// 节点已经存在,说明leader已经被别人注册成功了,自己是follower
status = Status.FOLLOWER;
try {
byte[] leaderInfoBytes = zooKeeper.getData(LEADER_INFO, event -> {
//这里watch了节点删除事件,当主节点失效时,触发选主操作
if (event.getType() == Watcher.Event.EventType.NodeDeleted) {
lookingForLeader();
}
}, null);
String logMsg = port + " is follower, master is " + new String(leaderInfoBytes, "UTF-8");
System.out.println(logMsg);
} catch (KeeperException.NoNodeException e1) {
// 如果这时异常,说明上一步(zookeeper.getData)发生异常
// 即主节点已经不存在,这里要触发选主操作
lookingForLeader();
} catch (KeeperException | InterruptedException | UnsupportedEncodingException e1) {
e1.printStackTrace();
}
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}

/**
* 关机时,自动关闭zk连接
*/
public void shutdown() {
try {
if (zooKeeper != null) {
zooKeeper.close();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}

/**
* 获取当前服务器的节点状态,leader/follower
*
* @return
*/
public Status getStatus() {
return status;
}

/**
* shutdown hook
*/
@Override
public void run() {
shutdown();
}

// 当前节点的状态,节点的状态必须在这三个中的一个
public enum Status {
LOOKING, // 选举中
LEADER, // 选举完毕,当前节点为leader
FOLLOWER; // 选举完毕,当前节点为follower
}
}

这里使用的是springboot+zk,启动操作类:LeaderInitService.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
package com.example.zk.test;

import com.example.zk.redis.RedisUtil;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@Component
public class LeaderInitService implements InitializingBean {

@Autowired
RaceLeaderService raceLeaderService;

@Value("${server.port}")
String port;

@Autowired
RedisUtil redisUtil;

/**
* bean初始完后,启动如下线程
*
* @throws Exception
*/
@Override
public void afterPropertiesSet() throws Exception {

//初次启动,需注册/servers
raceLeaderService.init();
//当前服务向/servers下注册临时子节点,保存自己的服务信息
raceLeaderService.register();
//发起选主操作
raceLeaderService.lookingForLeader();

//任务调度执行
new Thread(new Runnable() {
@Override
public void run() {
while (true) {
try {
if (raceLeaderService.getStatus().equals(RaceLeaderService.Status.LEADER)) {
System.out.println(port + "是master,分配任务");
//可获取到所有的服务信息,包括从节点
raceLeaderService.getServers();
} else {
//从节点用于处理任务
System.out.println(port + "是follower,处理任务");
}
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
} catch (Exception ex) {
System.out.println("error");
}
}
}
}).start();
}
}

pom.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.9.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>zk</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>zk</name>
<description>Demo project for Spring Boot</description>

<properties>
<java.version>1.8</java.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.5.5</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

</project>

application.yml

1
2
3
4
5
6
7
8
9
server:
port: 10025
spring:
application:
name: zk-service

zk:
host: 127.0.0.1:2181
timeout: 1000

总结

zk在创建节点时,只有一个节点能创建成功,其他客户端会创建失败。通过这种方式,完成资源竞争。另外创建的节点可保存一些信息,可利用这点来完成服务器信息及状态的记录。

当客户端断开zk连接时,其创建的对应临时节点就会被删除,同时利用watcher机制,来触发新一轮选举。

另外也可以利用这种机制来监测当前所有服务节点存活状态。

 上一页

 评论