说明
介绍在Mac上,使用Quarkus开发K8S Operator。
环境配置
- JAVA_HOME
需要jdk 8或者11。设置JAVA_HOME
环境变量:export JAVA_HOME=$(/usr/libexec/java_home)
export PATH=$JAVA_HOME/bin:$PATH
- 将graavlm解压,配置环境变量
# graalvm解压目录为 $HOME/workspace/graalvm-ce-19.2.1,需要设置到 `Contents/Home`目录。
export GRAALVM_HOME=$HOME/workspace/graalvm-ce-19.2.1/Contents/Home
- 安装
native-image
${GRAALVM_HOME}/bin/gu install native-image
开发
- 创建Quarkus工程
mvn io.quarkus:quarkus-maven-plugin:1.0.1.Final:create \
-DprojectGroupId=io.corestack.kubeedge \
-DprojectArtifactId=cloud-controller \
-Dextensions="kubernetes-client,spring-web" \
-DclassName="io.corestack.kubeedge.cloud.api.DemoApi" \
-Dpath="/greeting"
-Dextensions
中指明两个扩展包,kubernetes-client
和spring-web
。spring-web
包括了Spring相关的依赖(主要是@Controller
等Rest相关),其间接依赖了spring-di
(主要是@Service
,@Bean
等注解)。⚠️ Quarkus需要依赖 Maven 3.5.3+,否则会报错:
[INFO] ------------------------------------------------------------------------
[INFO] Building Maven Stub Project (No POM) 1
[INFO] ------------------------------------------------------------------------
[INFO]
[INFO] --- quarkus-maven-plugin:1.0.1.Final:create (default-cli) @ standalone-pom ---
[INFO] ------------------------------------------------------------------------
[INFO] BUILD FAILURE
[INFO] ------------------------------------------------------------------------
[INFO] Total time: 1.350 s
[INFO] Finished at: 2019-12-15T22:59:29+08:00
[INFO] Final Memory: 20M/364M
[INFO] ------------------------------------------------------------------------
[WARNING] The requested profile "profile-default" could not be activated because it does not exist.
[ERROR] Failed to execute goal io.quarkus:quarkus-maven-plugin:1.0.1.Final:create (default-cli) on project standalone-pom: Execution default-cli of goal io.quarkus:quarkus-maven-plugin:1.0.1.Final:create failed: An API incompatibility was encountered while executing io.quarkus:quarkus-maven-plugin:1.0.1.Final:create: java.lang.NoSuchMethodError: org.eclipse.aether.transfer.TransferResource.<init>(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/io/File;Lorg/eclipse/aether/RequestTrace;)V
[ERROR] -----------------------------------------------------
[ERROR] realm = plugin>io.quarkus:quarkus-maven-plugin:1.0.1.Final
[ERROR] strategy = org.codehaus.plexus.classworlds.strategy.SelfFirstStrategy
- 构建native包
mvn clean package -Pnative -Dnative-image.docker-build=true -Dmaven.test.skip=true
这个过程比较消耗资源,时间稍长。成功构建输出如下:
[cloud-controller-1.0-SNAPSHOT-runner:10155] classlist: 10,895.67 ms
[cloud-controller-1.0-SNAPSHOT-runner:10155] (cap): 1,508.99 ms
[cloud-controller-1.0-SNAPSHOT-runner:10155] setup: 3,177.11 ms
23:32:01,603 INFO [org.jbo.threads] JBoss Threads version 3.0.0.Final
[cloud-controller-1.0-SNAPSHOT-runner:10155] (typeflow): 32,318.03 ms
[cloud-controller-1.0-SNAPSHOT-runner:10155] (objects): 16,007.48 ms
[cloud-controller-1.0-SNAPSHOT-runner:10155] (features): 1,926.39 ms
[cloud-controller-1.0-SNAPSHOT-runner:10155] analysis: 52,210.21 ms
[cloud-controller-1.0-SNAPSHOT-runner:10155] (clinit): 1,068.27 ms
[cloud-controller-1.0-SNAPSHOT-runner:10155] universe: 2,633.68 ms
[cloud-controller-1.0-SNAPSHOT-runner:10155] (parse): 3,877.82 ms
[cloud-controller-1.0-SNAPSHOT-runner:10155] (inline): 5,853.83 ms
[cloud-controller-1.0-SNAPSHOT-runner:10155] (compile): 39,937.28 ms
[cloud-controller-1.0-SNAPSHOT-runner:10155] compile: 52,462.46 ms
[cloud-controller-1.0-SNAPSHOT-runner:10155] image: 4,570.19 ms
[cloud-controller-1.0-SNAPSHOT-runner:10155] write: 1,559.84 ms
[cloud-controller-1.0-SNAPSHOT-runner:10155] [total]: 128,015.07 ms
[INFO] [io.quarkus.deployment.QuarkusAugmentor] Quarkus augmentation completed in 131431ms
- 构建native镜像
在工程目录下执行:docker build -t io.corestack.kubeedge/cloud-controller:0.1 -f src/main/docker/Dockerfile.native .
测试
在K3S中部署测试。K3S的安装参考。
K3S中默认使用的是containerd
作为容器运行时,而不是docker
。所以不能直接使用docker本地镜像。
将导出的镜像拷贝至K3S环境,使用 ctr 将镜像导入至 containerd
ctr -a /run/k3s/containerd/containerd.sock -n k8s.io images import —base-name io.corestack.kubeedge/cloud-controller:0.1 cloud.tar
<a name="WebSocket"></a>
## WebSocket
在工程中通过 `mvn quarkus:add-extensions -Dextensions=quarkus-undertow-websockets`增加扩展包。
<a name="Server"></a>
### Server
```java
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
@ServerEndpoint(
value = "/node/{name}"
)
@Service
public class NodeSocket {
private final static Logger LOGGER = LoggerFactory.getLogger(NodeSocket.class);
@OnOpen
public void onOpen(Session session, @PathParam("name") String name) {
}
@OnMessage
public void onMessage(String message, @PathParam("name") String name) {
LOGGER.info("Received message: {}.", message);
}
}
- 通过
@ServerEndpoint
声明服务端。@OnMessage
声明的方法用来接收消息,默认情况下,只能接收文本消息。
Client
import javax.websocket.*;
import java.io.IOException;
import java.net.URI;
@Service
public class EdgeSocket {
private final static Logger LOGGER = LoggerFactory.getLogger(EdgeSocket.class);
private Session session;
public EdgeSocket() throws IOException, DeploymentException {
this.session = ContainerProvider.getWebSocketContainer().connectToServer(
EdgeClient.class,
URI.create("http://localhost:8080/node/edge")
);
}
public void send(String edgeMessage) {
this.session.getAsyncRemote().sendText(edgeMessage, new SendHandler() {
@Override
public void onResult(SendResult sendResult) {
LOGGER.info("Send result: {}, {}", sendResult.isOK(), sendResult.getException());
}
});
}
@ClientEndpoint
private static final class EdgeClient {
@OnOpen
public void onOpen(Session session) {
}
}
}
- 首先,需要通过
@ClientEndpoint
声明客户端,与服务端对应,同样包含四个生命周期方法。- 通过
ContainerProvider.getWebSocketContainer().connectToServer
连接服务端。- 通过
session.getAsyncRemote().sendText()
发送文本内容消息。
编解码
默认情况下,socket双方只能发送和接收文本消息。如果要发送对象,需要指定编解码器。
- 首先,客户端通过
session.getAsyncRemote().sendObject
方法发送对象到服务端。需要在客户端定义编码器,将对象转成String。
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import javax.websocket.EncodeException;
import javax.websocket.Encoder;
import javax.websocket.EndpointConfig;
public class MessageEncoder implements Encoder.Text {
private ObjectMapper objectMapper;
@Override
public String encode(Object o) throws EncodeException {
try {
return objectMapper.writeValueAsString(o);
} catch (JsonProcessingException e) {
throw new EncodeException(o, e.getMessage(), e);
}
}
@Override
public void init(EndpointConfig endpointConfig) {
this.objectMapper = new ObjectMapper();
}
@Override
public void destroy() {
}
}
在客户端Endpoint上,指定使用该编码器。
@ClientEndpoint (
encoders = {MessageEncoder.class}
)
private static final class EdgeClient {
@OnOpen
public void onOpen(Session session) {
}
}
- 服务端与客户端对应,需要定义解码器,将接收的String转为需要的对象。 ```java import com.fasterxml.jackson.databind.ObjectMapper; import io.corestack.kubeedge.cloud.message.EdgeMessage; import io.fabric8.kubernetes.api.model.Node; import org.slf4j.Logger; import org.slf4j.LoggerFactory;
import javax.websocket.DecodeException; import javax.websocket.Decoder; import javax.websocket.EndpointConfig; import java.io.IOException;
public class EdgeMessageDecoder implements Decoder.Text
private static final Logger LOGGER = LoggerFactory.getLogger(EdgeMessageDecoder.class);
private ObjectMapper objectMapper;
@Override
public EdgeMessage decode(String s) throws DecodeException {
try {
LOGGER.info("Received: {}.", s);
EdgeMessage ed = objectMapper.readValue(s, EdgeMessage.class);
LOGGER.info("source: {}, data: {}.", ed.getSource(), ed.getData());
if(ed.getSource().equals("node")) {
ed.setResource(objectMapper.convertValue(ed.getData(), Node.class));
}
return ed;
} catch (IOException e) {
LOGGER.warn("Error decode.", e);
throw new DecodeException(s, e.getMessage(), e);
}
}
@Override
public boolean willDecode(String s) {
return s != null;
}
@Override
public void init(EndpointConfig endpointConfig) {
objectMapper = new ObjectMapper();
}
@Override
public void destroy() {
}
}
> 这里使用JSON格式。
<br />同样,在服务端Endpoint上注明使用该解码器。
```java
@ServerEndpoint(
value = "/node/{name}",
decoders = {EdgeMessageDecoder.class}
)
@Service
public class NodeSocket {
private final static Logger LOGGER = LoggerFactory.getLogger(NodeSocket.class);
@OnOpen
public void onOpen(Session session, @PathParam("name") String name) {
}
@OnMessage
public void onMessage(EdgeMessage message, @PathParam("name") String name) {
LOGGER.info("Received message: {}.", message);
Node node = (Node) message.getResource();
LOGGER.info("Node: {}.", node);
LOGGER.info("Node: {}.", node.getMetadata().getName());
}
}
这里的
@OnMessage
注解的方法中,就使用了EdgeMessage
对象作为消息体。