Java 作为客户端 grpc 使用

119次阅读
没有评论

共计 6062 个字符,预计需要花费 16 分钟才能阅读完成。

1. 引入依赖以及插件

依赖包如下:

<!--  处理 .proto 文件并生成 Java 类 -->
<dependency>
    <groupId>io.grpc</groupId>
    <artifactId>grpc-protobuf</artifactId>
    <version>1.57.2</version>
</dependency>

<!-- 生成 gRPC 客户端和服务器端的 Stub 代码 -->
<dependency>
    <groupId>io.grpc</groupId>
    <artifactId>grpc-stub</artifactId>
    <version>1.57.2</version>
</dependency>

<!-- 负责 gRPC 的底层 HTTP/2 网络通信-->
<dependency>
    <groupId>io.grpc</groupId>
    <artifactId>grpc-netty-shaded</artifactId>
    <version>1.57.2</version>
</dependency>

maven 插件:

<plugins>
    <!-- protobuf 插件,需要时打开即可 -->
    <plugin>
        <groupId>org.xolstice.maven.plugins</groupId>
        <artifactId>protobuf-maven-plugin</artifactId>
        <version>0.6.1</version>
        <configuration>
            <protocArtifact>com.google.protobuf:protoc:3.5.1-1:exe:${os.detected.classifier}</protocArtifact>
            <pluginId>grpc-java</pluginId>
            <pluginArtifact>io.grpc:protoc-gen-grpc-java:1.14.0:exe:${os.detected.classifier}</pluginArtifact>
        </configuration>
        <executions>
            <execution>
                <goals>
                    <goal>compile</goal>
                    <goal>compile-custom</goal>
                </goals>
            </execution>
        </executions>
    </plugin>
</plugins>

<extensions>
    <extension>
        <groupId>kr.motd.maven</groupId>
        <artifactId>os-maven-plugin</artifactId>
        <version>1.7.1</version>
    </extension>
</extensions>

Java 作为客户端 grpc 使用

  • compile 负责生成 Protobuf 消息类,用于数据序列化和反序列化。

    • XXX.java(Protobuf 消息类):对应 .proto 文件中的 message 定义,作为数据传输对象 (DTO)。
    • XXXOrBuilder.java(接口):用于支持 Protobuf 生成的类的 Builder 模式。
  • compile-custom 作用是调用 protoc-gen-grpc-java 插件,生成 gRPC 相关的 Java 代码。

    • XXXGrpc.java(gRPC 服务类):包含 gRPC 服务器和客户端的存根 (stub)。
    • XXXGrpc.XxxBlockingStub(同步调用的客户端存根)
    • XXXGrpc.XxxFutureStub(异步调用的客户端存根)
    • XXXGrpc.XxxStub(非阻塞调用的客户端存根)
    • XXXGrpc.XXXImplBase(服务器端基类,服务端需要继承该类并实现具体逻辑)

2. 准备proto文件

power.proto

syntax = "proto3";

option java_multiple_files = false;
option java_package = "com.unipower.robot.server.component.grpc.power";

package powerManagement;

import "google/protobuf/empty.proto";

// 设备设置项
message DeviceSetting {
  string device = 1;    // 设备名称
  bool status = 2;      // 状态
  bool necessary = 3;   // 是否必要
}

// 设备电源控制请求
message DevicePowerControlRequest {

  // 设备控制指令。
  repeated DeviceSetting settings = 1;
}

// 设备电源控制响应
message DevicePowerControlResponse {

  // 状态码。
  // 成功:200;失败:500。
  int32 code = 1;

  // 操作结果消息。
  // 成功:操作成功;失败:操作失败,失败原因
  string message = 2;
}

message AppLayerPowerState {
  // 电池数据
  float voltage = 1;
  float current = 2;
  float rsoc = 3;
  float nominalCapacity = 4;
  uint32 cycleCount = 5;
  // ----

  // N1数据
  bool shutdownSignal = 6;
  bool buzzer = 7;
  bool motor = 8;
  // ----

  // N2数据
  bool appLayerCamera = 9;
  bool radar = 10;
  // ----

  // 数据采集时间戳
  uint64 timestamp = 11;

  // 可变部分
  string status = 12;
}

// L1E AppLayerPowerState.status JSON format
// "sideCharging": bool // 是否通过侧边充电中
// "bottomCharging": bool // 是否通过底部充电中
// "bottomChargeEnabled": bool // 底部充电开关状态
// "SC": bool // SC电源开关状态
// "controlBoardFan": bool // 控制板散热风扇开关状态
// "appBoardFan": bool // 应用板散热风扇开关状态
// "wiperFan": bool // 雨刷风扇开关状态
// "emergencyStopSignal": bool // 急停信号
// "alarmLight": bool // 告警灯开关状态

// 电源管理服务定义
service PowerManagementService {
  // 控制设备电源状态
  rpc SetDevicePowerState (DevicePowerControlRequest) returns (DevicePowerControlResponse);

  // 获取电源状态状态
  rpc GetAppLayerPowerStateOnce (google.protobuf.Empty) returns (AppLayerPowerState);
  rpc GetAppLayerPowerStateStream (google.protobuf.Empty) returns (stream AppLayerPowerState);
}

然后将 proto 文件放在 src/main/proto 目录下即可。

如果 proto 文件比较稳定,可以用插件编译后,将 target 目录下插件生成的代码直接拷贝到项目中,以免每次启动或者打包都重新编译 proto 文件。

3. 配置以及使用

配置 gRPC 客户端的连接。

GrpcClientConfig.java

@Configuration
public class GrpcClientConfig {

    @Value("${power.grpcAddress}")
    private String powerGrpcAddress;

    @Value("${power.grpcPort}")
    private int powerGrpcPort;

    // 心跳超时时间,单位s
    private static final int KEEP_ALIVE_TIME_OUT = 10;

    // 最大尝试重连次数
    private static final int RETRY_TIMES = 3;

    // 阻塞调用超时时间,单位s
    private static final int BLOCKING_REQUEST_TIME_OUT = 30;

    @Bean
    public ManagedChannel powerChannel() {
        return ManagedChannelBuilder.forAddress(powerGrpcAddress, powerGrpcPort)
                .usePlaintext()
                .keepAliveTime(KEEP_ALIVE_TIME_OUT, TimeUnit.SECONDS)
                .enableRetry()
                .maxRetryAttempts(RETRY_TIMES)
                .build();
    }

    /**
     * 同步阻塞的存根
     * @param powerChannel
     * @return
     */
    @Bean
    public PowerManagementServiceGrpc.PowerManagementServiceBlockingStub powerManagementServiceBlockingStub(ManagedChannel powerChannel) {
        return PowerManagementServiceGrpc.newBlockingStub(powerChannel)
                .withDeadlineAfter(BLOCKING_REQUEST_TIME_OUT, TimeUnit.SECONDS);
    }

    /**
     * 异步非阻塞的存根
     * @param powerChannel
     * @return
     */
    @Bean
    public PowerManagementServiceGrpc.PowerManagementServiceStub powerManagementServiceStub(ManagedChannel powerChannel) {
        return PowerManagementServiceGrpc.newStub(powerChannel);
    }
}

RobotPowerClient.java

@Slf4j
@Component
public class RobotPowerClient {
    @Autowired
    PowerManagementServiceGrpc.PowerManagementServiceBlockingStub powerManagementServiceBlockingStub;

    @Autowired
    PowerManagementServiceGrpc.PowerManagementServiceStub asyncPowerManagementServiceStub;

    /**
     * 设置各设备的电源状态
     * @param deviceSettings
     * @return
     */
    public boolean applyDeviceSettings(Power.DeviceSetting... deviceSettings){
        log.debug("调用 grpc 接口设置设备状态参数:{}", deviceSettings);
        Power.DevicePowerControlResponse response = powerManagementServiceBlockingStub.setDevicePowerState(
                Power.DevicePowerControlRequest.newBuilder()
                        .addAllSettings(Arrays.asList(deviceSettings))
                        .build()
        );
        log.debug("调用 grpc 接口设置设备状态结果:{}", response);
        return response.getCode() == ErrorCode.OK.getCode();
    }

    /**
     * 流式获取电源状态信息
     * @param callback
     */
    public void getAppLayerPowerStateStream(Consumer<Power.AppLayerPowerState> callback){
        asyncPowerManagementServiceStub.getAppLayerPowerStateStream(Empty.newBuilder().build(), new StreamObserver<Power.AppLayerPowerState>() {

            @Override
            public void onNext(Power.AppLayerPowerState appLayerPowerState) {
                // 调用传入的回调方法
                if (callback != null) {
                    callback.accept(appLayerPowerState);
                }
            }

            @Override
            public void onError(Throwable throwable) {
                log.error("流式获取电源状态信息异常", throwable);
                retry();
            }

            @Override
            public void onCompleted() {
                log.error("服务端终止上送电源状态");
                retry();
            }

            private void retry() {
                log.info("尝试重新启动流式获取电源状态信息...");
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    log.error(e.getMessage(), e);
                    Thread.currentThread().interrupt();
                }
                getAppLayerPowerStateStream(callback);
            }
        });
    }

}

4. 可能存在的问题

  1. 长连接无感知连接断开问题解决

该问题出现在 grpc 客户端调用,服务端流式返回。如果未设置超时时间,即使服务端网络断开,客户端仍无法感知。

解决方案是配置 keepalive,可以让客户端知道服务端不可达,从而进行重试或断开连接,详见 GrpcClientConfig.java

  1. 超时时间设置

阻塞式grpc调用,需要配置超时时间,使用 withDeadlineAfter 配置超时时间,详见 GrpcClientConfig.java

  1. gRPC自带重试机制失败后的处理

即使 grpc 配置重试,重试指定次数后抛出异常,此后不再重试。

解决方案是业务层配置重试兜底,详见 RobotPowerClient.java

  1. 网络断开再重连数据一次性收到所有返回

作为服务端来说,推送数据即使网络断开了,由于 grpc 基于 tcp,重试机制会一直尝试发送没有送达的数据包。所以当网络恢复,会有大量堆积的数据推送到客户端。

解决方案是服务端配置 keepalive,,一旦发现客户端不可达,立即关闭连接,等待客户端再次发起请求。

AD:【腾讯云服务器大降价】2核4G 222元/3年 1核2G 38元/年
正文完
 0
阿蛮君
版权声明:本站原创文章,由 阿蛮君 于2025-03-03发表,共计6062字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
评论(没有评论)
Copyright © 2023-2025 阿蛮君博客 湘ICP备2023001393号
本网站由 亿信互联 提供云计算服务 | 又拍云CDN 提供安全防护和加速服务
Powered by Wordpress  Theme by Puock