Java 作为客户端 grpc 使用

395次阅读
没有评论

共计 6986 个字符,预计需要花费 18 分钟才能阅读完成。

1. 引入依赖以及插件

依赖包如下:

<properties>
    <grpc.version>1.69.0</grpc.version>
    <protoc.version>3.25.3</protoc.version>
    <grpc-java.version>1.69.0</grpc-java.version>
</properties>

<!-- gRPC protobuf 依赖 -->
<dependency>
    <groupId>io.grpc</groupId>
    <artifactId>grpc-protobuf</artifactId>
    <version>${grpc.version}</version>
</dependency>

<!-- gRPC Stub 依赖 -->
<dependency>
    <groupId>io.grpc</groupId>
    <artifactId>grpc-stub</artifactId>
    <version>${grpc.version}</version>
</dependency>

<!-- grpc-netty-shaded -->
<dependency>
    <groupId>io.grpc</groupId>
    <artifactId>grpc-netty-shaded</artifactId>
    <version>${grpc.version}</version>
</dependency>

maven 插件:

<plugins>
    <!-- protobuf 插件,需要时打开即可 -->
    <plugin>
        <groupId>org.xolstice.maven.plugins</groupId>
        <artifactId>protobuf-maven-plugin</artifactId>
        <version>0.6.1</version>
        <configuration>
            <protocArtifact>com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}</protocArtifact>
            <pluginId>grpc-java</pluginId>
            <pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc-java.version}:exe:${os.detected.classifier}</pluginArtifact>
        </configuration>
        <executions>
            <execution>
                <goals>
                    <goal>compile</goal>
                    <goal>compile-custom</goal>
                </goals>
            </execution>
        </executions>
    </plugin>
</plugins>

<extensions>
    <extension>
        <groupId>kr.motd.maven</groupId>
        <artifactId>os-maven-plugin</artifactId>
        <version>1.7.1</version>
    </extension>
</extensions>

Java 作为客户端 grpc 使用

  • compile 负责生成 Protobuf 消息类,用于数据序列化和反序列化。

    • XXX.java(Protobuf 消息类):对应 .proto 文件中的 message 定义,作为数据传输对象 (DTO)。
    • XXXOrBuilder.java(接口):用于支持 Protobuf 生成的类的 Builder 模式。
  • compile-custom 作用是调用 protoc-gen-grpc-java 插件,生成 gRPC 相关的 Java 代码。

    • XXXGrpc.java(gRPC 服务类):包含 gRPC 服务器和客户端的存根 (stub)。
    • XXXGrpc.XxxBlockingStub(同步调用的客户端存根)
    • XXXGrpc.XxxFutureStub(异步调用的客户端存根)
    • XXXGrpc.XxxStub(非阻塞调用的客户端存根)
    • XXXGrpc.XXXImplBase(服务器端基类,服务端需要继承该类并实现具体逻辑)

2. 准备proto文件

power.proto

syntax = "proto3";

option java_multiple_files = false;
option java_package = "com.unipower.robot.server.component.grpc.power";

package powerManagement;

import "google/protobuf/empty.proto";

// 设备设置项
message DeviceSetting {
  string device = 1;    // 设备名称
  bool status = 2;      // 状态
  bool necessary = 3;   // 是否必要
}

// 设备电源控制请求
message DevicePowerControlRequest {

  // 设备控制指令。
  repeated DeviceSetting settings = 1;
}

// 设备电源控制响应
message DevicePowerControlResponse {

  // 状态码。
  // 成功:200;失败:500。
  int32 code = 1;

  // 操作结果消息。
  // 成功:操作成功;失败:操作失败,失败原因
  string message = 2;
}

message AppLayerPowerState {
  // 电池数据
  float voltage = 1;
  float current = 2;
  float rsoc = 3;
  float nominalCapacity = 4;
  uint32 cycleCount = 5;
  // ----

  // N1数据
  bool shutdownSignal = 6;
  bool buzzer = 7;
  bool motor = 8;
  // ----

  // N2数据
  bool appLayerCamera = 9;
  bool radar = 10;
  // ----

  // 数据采集时间戳
  uint64 timestamp = 11;

  // 可变部分
  string status = 12;
}

// L1E AppLayerPowerState.status JSON format
// "sideCharging": bool // 是否通过侧边充电中
// "bottomCharging": bool // 是否通过底部充电中
// "bottomChargeEnabled": bool // 底部充电开关状态
// "SC": bool // SC电源开关状态
// "controlBoardFan": bool // 控制板散热风扇开关状态
// "appBoardFan": bool // 应用板散热风扇开关状态
// "wiperFan": bool // 雨刷风扇开关状态
// "emergencyStopSignal": bool // 急停信号
// "alarmLight": bool // 告警灯开关状态

// 电源管理服务定义
service PowerManagementService {
  // 控制设备电源状态
  rpc SetDevicePowerState (DevicePowerControlRequest) returns (DevicePowerControlResponse);

  // 获取电源状态状态
  rpc GetAppLayerPowerStateOnce (google.protobuf.Empty) returns (AppLayerPowerState);
  rpc GetAppLayerPowerStateStream (google.protobuf.Empty) returns (stream AppLayerPowerState);
}

然后将 proto 文件放在 src/main/proto 目录下即可。

如果 proto 文件比较稳定,可以用插件编译后,将 target 目录下插件生成的代码直接拷贝到项目中,以免每次启动或者打包都重新编译 proto 文件。

3. 配置以及使用

配置 gRPC 客户端的连接。

GrpcClientConfig.java

@Configuration
public class GrpcClientConfig {

    @Value("${power.grpcAddress}")
    private String powerGrpcAddress;

    @Value("${power.grpcPort}")
    private int powerGrpcPort;

    // 发送心跳间隔(最低10s)
    private static final int KEEP_ALIVE_TIME = 10;

    // 心跳超时时间,单位s(设置了超过20s后不生效)
    private static final int KEEP_ALIVE_TIME_OUT = 5;

    @Bean
    public ManagedChannel powerChannel() {
        // 由于启用重试配置没看见效果所以没有配置
        return ManagedChannelBuilder.forAddress(powerGrpcAddress, powerGrpcPort)
                .usePlaintext()
                .keepAliveTime(KEEP_ALIVE_TIME, TimeUnit.SECONDS)
                .keepAliveTimeout(KEEP_ALIVE_TIME_OUT, TimeUnit.SECONDS)
                .build();
    }

    /**
     * 同步阻塞的存根
     * @param powerChannel
     * @return
     */
    @Bean
    public PowerManagementServiceGrpc.PowerManagementServiceBlockingStub powerManagementServiceBlockingStub(ManagedChannel powerChannel) {
        return PowerManagementServiceGrpc.newBlockingStub(powerChannel);
    }

    /**
     * 异步非阻塞的存根
     * @param powerChannel
     * @return
     */
    @Bean
    public PowerManagementServiceGrpc.PowerManagementServiceStub powerManagementServiceStub(ManagedChannel powerChannel) {
        return PowerManagementServiceGrpc.newStub(powerChannel);
    }
}

RobotPowerClient.java

@Slf4j
@Component
public class RobotPowerClient {
    @Autowired
    PowerManagementServiceGrpc.PowerManagementServiceBlockingStub powerManagementServiceBlockingStub;

    @Autowired
    PowerManagementServiceGrpc.PowerManagementServiceStub asyncPowerManagementServiceStub;

    /**
     * 设置各设备的电源状态
     * @param deviceSettings
     * @return
     */
    public boolean applyDeviceSettings(Power.DeviceSetting... deviceSettings){
        log.debug("调用 grpc 接口设置设备状态参数:{}", deviceSettings);
        Power.DevicePowerControlResponse response = powerManagementServiceBlockingStub
                        .withDeadlineAfter(30, TimeUnit.SECONDS)
                        .setDevicePowerState(Power.DevicePowerControlRequest.newBuilder()
                        .addAllSettings(Arrays.asList(deviceSettings))
                        .build()
        );
        log.debug("调用 grpc 接口设置设备状态结果:{}", response);
        return response.getCode() == ErrorCode.OK.getCode();
    }

    /**
     * 流式获取电源状态信息
     * @param callback
     */
    public void getAppLayerPowerStateStream(Consumer<Power.AppLayerPowerState> callback){
        asyncPowerManagementServiceStub.getAppLayerPowerStateStream(Empty.newBuilder().build(), new StreamObserver<Power.AppLayerPowerState>() {

            @Override
            public void onNext(Power.AppLayerPowerState appLayerPowerState) {
                // 调用传入的回调方法
                if (callback != null) {
                    callback.accept(appLayerPowerState);
                }
            }

            @Override
            public void onError(Throwable throwable) {
                log.error("流式获取电源状态信息异常", throwable);
                retry();
            }

            @Override
            public void onCompleted() {
                log.error("服务端终止上送电源状态");
                retry();
            }

            private void retry() {
                log.info("尝试重新启动流式获取电源状态信息...");
                try {
                    Thread.sleep(3000);
                } catch (InterruptedException e) {
                    log.error(e.getMessage(), e);
                    Thread.currentThread().interrupt();
                }
                getAppLayerPowerStateStream(callback);
            }
        });
    }

}

4. 可能存在的问题

  1. 长连接无感知连接断开问题解决

该问题出现在 grpc 客户端调用,服务端流式返回。如果未设置超时时间,即使服务端网络断开,客户端仍无法感知。

解决方案是配置 keepalive,可以让客户端知道服务端不可达,从而进行重试或断开连接,详见 GrpcClientConfig.java

  • KEEP_ALIVE_TIME:发送心跳的间隔,java 的 grpc 包中设置该值最小为 10 秒。

  • KEEP_ALIVE_TIME_OUT:keepalive 超时时间,如果超过该时间,则认为连接不可达。(测试中发现该值最大为20s)

需要注意的是,keepalive 是在连接没有发送数据后发送心跳包,所以从断开到发现断开的时间为 KEEP_ALIVE_TIME + KEEP_ALIVE_TIME_OUT

  1. 超时时间设置

阻塞式grpc调用,需要配置超时时间,使用 withDeadlineAfter 配置超时时间,详见 RobotPowerClient.java

这里需要注意的是,需要在每次使用 stub 进行调用时,才需要配置超时时间。如果在 GrpcClientConfig.java 配置,则程序启动时间超过指定时间以后进行调用就会提示超时。

  1. gRPC连接断开后的处理

由于测试 grpc 的重试机制并没有生效,所以目前都是业务层配置重试兜底,详见 RobotPowerClient.java

  1. 网络断开再重连数据一次性收到所有返回

作为服务端来说,推送数据即使网络断开了,由于 grpc 基于 tcp,重试机制会一直尝试发送没有送达的数据包。所以当网络恢复,会有大量堆积的数据推送到客户端。

解决方案是服务端配置 keepalive,,一旦发现客户端不可达,立即关闭连接,等待客户端再次发起请求。

这里如果客户端发送 ping 包时间间隔太小,可能导致 UNAVAILABLE: Too many pings 报错,解决方案在 python 作为服务端的情况下是调整相关间隔。

def serve():
    server = grpc.server(
        futures.ThreadPoolExecutor(max_workers=10),
        options=[
            ('grpc.keepalive_time_ms', 5000),  # 允许每 5 秒发送一次 KeepAlive
            ('grpc.keepalive_timeout_ms', 5000),  # 5 秒内未响应则认为连接断开
            ('grpc.keepalive_permit_without_calls', True),  # 即使没有 RPC 也允许 KeepAlive
            ('grpc.http2.max_pings_without_data', 0),  # 允许不带数据的 PING
            ('grpc.http2.min_time_between_pings_ms', 1000),  # 允许的最小 PING 发送间隔
            ('grpc.http2.min_ping_interval_without_data_ms', 1000),  # 无数据时 PING 最小间隔
        ]
    )
    #...
AD:【腾讯云服务器大降价】2核4G 222元/3年 1核2G 38元/年
正文完
 0
阿蛮君
版权声明:本站原创文章,由 阿蛮君 于2025-03-03发表,共计6986字。
转载说明:除特殊说明外本站文章皆由CC-4.0协议发布,转载请注明出处。
评论(没有评论)
Copyright © 2023-2025 阿蛮君博客 湘ICP备2023001393号
本网站由 亿信互联 提供云计算服务 | 又拍云CDN 提供安全防护和加速服务
Powered by Wordpress  Theme by Puock