说明

介绍在Mac上,使用Quarkus开发K8S Operator。

环境配置

  • JAVA_HOME
    需要jdk 8或者11。设置JAVA_HOME环境变量:
    1. export JAVA_HOME=$(/usr/libexec/java_home)
    2. export PATH=$JAVA_HOME/bin:$PATH
  • 安装maven,版本要求3.5.3以上。

  • Graalvm

    • 下载graalvm

      ⚠️ 目前只有 19.2.1版本支持Quarkus 1.0.1。

  • 将graavlm解压,配置环境变量
    1. # graalvm解压目录为 $HOME/workspace/graalvm-ce-19.2.1,需要设置到 `Contents/Home`目录。
    2. export GRAALVM_HOME=$HOME/workspace/graalvm-ce-19.2.1/Contents/Home
  • 安装 native-image
    1. ${GRAALVM_HOME}/bin/gu install native-image

开发

  • 创建Quarkus工程
  1. mvn io.quarkus:quarkus-maven-plugin:1.0.1.Final:create \
  2. -DprojectGroupId=io.corestack.kubeedge \
  3. -DprojectArtifactId=cloud-controller \
  4. -Dextensions="kubernetes-client,spring-web" \
  5. -DclassName="io.corestack.kubeedge.cloud.api.DemoApi" \
  6. -Dpath="/greeting"

-Dextensions中指明两个扩展包,kubernetes-clientspring-webspring-web包括了Spring相关的依赖(主要是@Controller等Rest相关),其间接依赖了spring-di(主要是 @Service@Bean等注解)。

⚠️ Quarkus需要依赖 Maven 3.5.3+,否则会报错:

  1. [INFO] ------------------------------------------------------------------------
  2. [INFO] Building Maven Stub Project (No POM) 1
  3. [INFO] ------------------------------------------------------------------------
  4. [INFO]
  5. [INFO] --- quarkus-maven-plugin:1.0.1.Final:create (default-cli) @ standalone-pom ---
  6. [INFO] ------------------------------------------------------------------------
  7. [INFO] BUILD FAILURE
  8. [INFO] ------------------------------------------------------------------------
  9. [INFO] Total time: 1.350 s
  10. [INFO] Finished at: 2019-12-15T22:59:29+08:00
  11. [INFO] Final Memory: 20M/364M
  12. [INFO] ------------------------------------------------------------------------
  13. [WARNING] The requested profile "profile-default" could not be activated because it does not exist.
  14. [ERROR] Failed to execute goal io.quarkus:quarkus-maven-plugin:1.0.1.Final:create (default-cli) on project standalone-pom: Execution default-cli of goal io.quarkus:quarkus-maven-plugin:1.0.1.Final:create failed: An API incompatibility was encountered while executing io.quarkus:quarkus-maven-plugin:1.0.1.Final:create: java.lang.NoSuchMethodError: org.eclipse.aether.transfer.TransferResource.<init>(Ljava/lang/String;Ljava/lang/String;Ljava/lang/String;Ljava/io/File;Lorg/eclipse/aether/RequestTrace;)V
  15. [ERROR] -----------------------------------------------------
  16. [ERROR] realm = plugin>io.quarkus:quarkus-maven-plugin:1.0.1.Final
  17. [ERROR] strategy = org.codehaus.plexus.classworlds.strategy.SelfFirstStrategy
  • 构建native包
    mvn clean package -Pnative -Dnative-image.docker-build=true -Dmaven.test.skip=true
    


这个过程比较消耗资源,时间稍长。成功构建输出如下:

[cloud-controller-1.0-SNAPSHOT-runner:10155]    classlist:  10,895.67 ms
[cloud-controller-1.0-SNAPSHOT-runner:10155]        (cap):   1,508.99 ms
[cloud-controller-1.0-SNAPSHOT-runner:10155]        setup:   3,177.11 ms
23:32:01,603 INFO  [org.jbo.threads] JBoss Threads version 3.0.0.Final
[cloud-controller-1.0-SNAPSHOT-runner:10155]   (typeflow):  32,318.03 ms
[cloud-controller-1.0-SNAPSHOT-runner:10155]    (objects):  16,007.48 ms
[cloud-controller-1.0-SNAPSHOT-runner:10155]   (features):   1,926.39 ms
[cloud-controller-1.0-SNAPSHOT-runner:10155]     analysis:  52,210.21 ms
[cloud-controller-1.0-SNAPSHOT-runner:10155]     (clinit):   1,068.27 ms
[cloud-controller-1.0-SNAPSHOT-runner:10155]     universe:   2,633.68 ms
[cloud-controller-1.0-SNAPSHOT-runner:10155]      (parse):   3,877.82 ms
[cloud-controller-1.0-SNAPSHOT-runner:10155]     (inline):   5,853.83 ms
[cloud-controller-1.0-SNAPSHOT-runner:10155]    (compile):  39,937.28 ms
[cloud-controller-1.0-SNAPSHOT-runner:10155]      compile:  52,462.46 ms
[cloud-controller-1.0-SNAPSHOT-runner:10155]        image:   4,570.19 ms
[cloud-controller-1.0-SNAPSHOT-runner:10155]        write:   1,559.84 ms
[cloud-controller-1.0-SNAPSHOT-runner:10155]      [total]: 128,015.07 ms
[INFO] [io.quarkus.deployment.QuarkusAugmentor] Quarkus augmentation completed in 131431ms
  • 构建native镜像
    在工程目录下执行:
    docker build -t io.corestack.kubeedge/cloud-controller:0.1 -f src/main/docker/Dockerfile.native .
    

测试

在K3S中部署测试。K3S的安装参考

K3S中默认使用的是containerd作为容器运行时,而不是docker。所以不能直接使用docker本地镜像。

  • 使用ctr加载镜像 ```bash

    首先,导出docker镜像

    docker save io.corestack.kubeedge/cloud-controller:0.1 -o cloud.tar

将导出的镜像拷贝至K3S环境,使用 ctr 将镜像导入至 containerd

ctr -a /run/k3s/containerd/containerd.sock -n k8s.io images import —base-name io.corestack.kubeedge/cloud-controller:0.1 cloud.tar




<a name="WebSocket"></a>
## WebSocket

在工程中通过 `mvn quarkus:add-extensions -Dextensions=quarkus-undertow-websockets`增加扩展包。

<a name="Server"></a>
### Server

```java
import javax.websocket.OnMessage;
import javax.websocket.OnOpen;
import javax.websocket.Session;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;

@ServerEndpoint(
        value = "/node/{name}"
)
@Service
public class NodeSocket {

    private final static Logger LOGGER = LoggerFactory.getLogger(NodeSocket.class);

    @OnOpen
    public void onOpen(Session session, @PathParam("name") String name) {

    }

    @OnMessage
    public void onMessage(String message, @PathParam("name") String name) {
        LOGGER.info("Received message: {}.", message);
    }
}
  1. 通过 @ServerEndpoint 声明服务端。
  2. @OnMessage声明的方法用来接收消息,默认情况下,只能接收文本消息。

Client

import javax.websocket.*;
import java.io.IOException;
import java.net.URI;

@Service
public class EdgeSocket {

    private final static Logger LOGGER = LoggerFactory.getLogger(EdgeSocket.class);

    private Session session;

    public EdgeSocket() throws IOException, DeploymentException {
        this.session = ContainerProvider.getWebSocketContainer().connectToServer(
                EdgeClient.class,
                URI.create("http://localhost:8080/node/edge")
        );
    }

    public void send(String edgeMessage) {
        this.session.getAsyncRemote().sendText(edgeMessage, new SendHandler() {
            @Override
            public void onResult(SendResult sendResult) {
                LOGGER.info("Send result: {}, {}", sendResult.isOK(), sendResult.getException());
            }
        });
    }

    @ClientEndpoint
    private static final class EdgeClient {

        @OnOpen
        public void onOpen(Session session) {
        }

    }
}
  1. 首先,需要通过 @ClientEndpoint声明客户端,与服务端对应,同样包含四个生命周期方法。
  2. 通过 ContainerProvider.getWebSocketContainer().connectToServer 连接服务端。
  3. 通过 session.getAsyncRemote().sendText() 发送文本内容消息。

编解码

默认情况下,socket双方只能发送和接收文本消息。如果要发送对象,需要指定编解码器。

  • 首先,客户端通过 session.getAsyncRemote().sendObject方法发送对象到服务端。需要在客户端定义编码器,将对象转成String。
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

import javax.websocket.EncodeException;
import javax.websocket.Encoder;
import javax.websocket.EndpointConfig;

public class MessageEncoder implements Encoder.Text {

    private ObjectMapper objectMapper;

    @Override
    public String encode(Object o) throws EncodeException {
        try {
            return objectMapper.writeValueAsString(o);
        } catch (JsonProcessingException e) {
            throw new EncodeException(o, e.getMessage(), e);
        }
    }

    @Override
    public void init(EndpointConfig endpointConfig) {
        this.objectMapper = new ObjectMapper();
    }

    @Override
    public void destroy() {

    }
}

在客户端Endpoint上,指定使用该编码器。

@ClientEndpoint (
            encoders = {MessageEncoder.class}
    )
    private static final class EdgeClient {

        @OnOpen
        public void onOpen(Session session) {
        }

    }
  • 服务端与客户端对应,需要定义解码器,将接收的String转为需要的对象。 ```java import com.fasterxml.jackson.databind.ObjectMapper; import io.corestack.kubeedge.cloud.message.EdgeMessage; import io.fabric8.kubernetes.api.model.Node; import org.slf4j.Logger; import org.slf4j.LoggerFactory;

import javax.websocket.DecodeException; import javax.websocket.Decoder; import javax.websocket.EndpointConfig; import java.io.IOException;

public class EdgeMessageDecoder implements Decoder.Text {

private static final Logger LOGGER = LoggerFactory.getLogger(EdgeMessageDecoder.class);

private ObjectMapper objectMapper;

@Override
public EdgeMessage decode(String s) throws DecodeException {
    try {
        LOGGER.info("Received: {}.", s);
        EdgeMessage ed = objectMapper.readValue(s, EdgeMessage.class);
        LOGGER.info("source: {}, data: {}.", ed.getSource(), ed.getData());

        if(ed.getSource().equals("node")) {
            ed.setResource(objectMapper.convertValue(ed.getData(), Node.class));
        }
        return ed;
    } catch (IOException e) {
        LOGGER.warn("Error decode.", e);
        throw new DecodeException(s, e.getMessage(), e);
    }
}

@Override
public boolean willDecode(String s) {
    return s != null;
}

@Override
public void init(EndpointConfig endpointConfig) {
    objectMapper = new ObjectMapper();
}

@Override
public void destroy() {

}

}


> 这里使用JSON格式。


<br />同样,在服务端Endpoint上注明使用该解码器。
```java
@ServerEndpoint(
        value = "/node/{name}",
        decoders = {EdgeMessageDecoder.class}
)
@Service
public class NodeSocket {

    private final static Logger LOGGER = LoggerFactory.getLogger(NodeSocket.class);

    @OnOpen
    public void onOpen(Session session, @PathParam("name") String name) {

    }

    @OnMessage
    public void onMessage(EdgeMessage message, @PathParam("name") String name) {
        LOGGER.info("Received message: {}.", message);
        Node node = (Node) message.getResource();
        LOGGER.info("Node: {}.", node);
        LOGGER.info("Node: {}.", node.getMetadata().getName());
    }
}

这里的@OnMessage注解的方法中,就使用了EdgeMessage对象作为消息体。