共计 6986 个字符,预计需要花费 18 分钟才能阅读完成。
1. 引入依赖以及插件
依赖包如下:
<properties>
<grpc.version>1.69.0</grpc.version>
<protoc.version>3.25.3</protoc.version>
<grpc-java.version>1.69.0</grpc-java.version>
</properties>
<!-- gRPC protobuf 依赖 -->
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-protobuf</artifactId>
<version>${grpc.version}</version>
</dependency>
<!-- gRPC Stub 依赖 -->
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-stub</artifactId>
<version>${grpc.version}</version>
</dependency>
<!-- grpc-netty-shaded -->
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty-shaded</artifactId>
<version>${grpc.version}</version>
</dependency>
maven 插件:
<plugins>
<!-- protobuf 插件,需要时打开即可 -->
<plugin>
<groupId>org.xolstice.maven.plugins</groupId>
<artifactId>protobuf-maven-plugin</artifactId>
<version>0.6.1</version>
<configuration>
<protocArtifact>com.google.protobuf:protoc:${protoc.version}:exe:${os.detected.classifier}</protocArtifact>
<pluginId>grpc-java</pluginId>
<pluginArtifact>io.grpc:protoc-gen-grpc-java:${grpc-java.version}:exe:${os.detected.classifier}</pluginArtifact>
</configuration>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>compile-custom</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
<extensions>
<extension>
<groupId>kr.motd.maven</groupId>
<artifactId>os-maven-plugin</artifactId>
<version>1.7.1</version>
</extension>
</extensions>
-
compile
负责生成 Protobuf 消息类,用于数据序列化和反序列化。XXX.java
(Protobuf 消息类):对应.proto
文件中的message
定义,作为数据传输对象 (DTO)。XXXOrBuilder.java
(接口):用于支持 Protobuf 生成的类的Builder
模式。
-
compile-custom
作用是调用 protoc-gen-grpc-java 插件,生成 gRPC 相关的 Java 代码。XXXGrpc.java
(gRPC 服务类):包含 gRPC 服务器和客户端的存根 (stub)。XXXGrpc.XxxBlockingStub
(同步调用的客户端存根)XXXGrpc.XxxFutureStub
(异步调用的客户端存根)XXXGrpc.XxxStub
(非阻塞调用的客户端存根)XXXGrpc.XXXImplBase
(服务器端基类,服务端需要继承该类并实现具体逻辑)
2. 准备proto文件
power.proto
syntax = "proto3";
option java_multiple_files = false;
option java_package = "com.unipower.robot.server.component.grpc.power";
package powerManagement;
import "google/protobuf/empty.proto";
// 设备设置项
message DeviceSetting {
string device = 1; // 设备名称
bool status = 2; // 状态
bool necessary = 3; // 是否必要
}
// 设备电源控制请求
message DevicePowerControlRequest {
// 设备控制指令。
repeated DeviceSetting settings = 1;
}
// 设备电源控制响应
message DevicePowerControlResponse {
// 状态码。
// 成功:200;失败:500。
int32 code = 1;
// 操作结果消息。
// 成功:操作成功;失败:操作失败,失败原因
string message = 2;
}
message AppLayerPowerState {
// 电池数据
float voltage = 1;
float current = 2;
float rsoc = 3;
float nominalCapacity = 4;
uint32 cycleCount = 5;
// ----
// N1数据
bool shutdownSignal = 6;
bool buzzer = 7;
bool motor = 8;
// ----
// N2数据
bool appLayerCamera = 9;
bool radar = 10;
// ----
// 数据采集时间戳
uint64 timestamp = 11;
// 可变部分
string status = 12;
}
// L1E AppLayerPowerState.status JSON format
// "sideCharging": bool // 是否通过侧边充电中
// "bottomCharging": bool // 是否通过底部充电中
// "bottomChargeEnabled": bool // 底部充电开关状态
// "SC": bool // SC电源开关状态
// "controlBoardFan": bool // 控制板散热风扇开关状态
// "appBoardFan": bool // 应用板散热风扇开关状态
// "wiperFan": bool // 雨刷风扇开关状态
// "emergencyStopSignal": bool // 急停信号
// "alarmLight": bool // 告警灯开关状态
// 电源管理服务定义
service PowerManagementService {
// 控制设备电源状态
rpc SetDevicePowerState (DevicePowerControlRequest) returns (DevicePowerControlResponse);
// 获取电源状态状态
rpc GetAppLayerPowerStateOnce (google.protobuf.Empty) returns (AppLayerPowerState);
rpc GetAppLayerPowerStateStream (google.protobuf.Empty) returns (stream AppLayerPowerState);
}
然后将 proto 文件放在 src/main/proto
目录下即可。
如果 proto 文件比较稳定,可以用插件编译后,将 target 目录下插件生成的代码直接拷贝到项目中,以免每次启动或者打包都重新编译 proto 文件。
3. 配置以及使用
配置 gRPC 客户端的连接。
GrpcClientConfig.java
@Configuration
public class GrpcClientConfig {
@Value("${power.grpcAddress}")
private String powerGrpcAddress;
@Value("${power.grpcPort}")
private int powerGrpcPort;
// 发送心跳间隔(最低10s)
private static final int KEEP_ALIVE_TIME = 10;
// 心跳超时时间,单位s(设置了超过20s后不生效)
private static final int KEEP_ALIVE_TIME_OUT = 5;
@Bean
public ManagedChannel powerChannel() {
// 由于启用重试配置没看见效果所以没有配置
return ManagedChannelBuilder.forAddress(powerGrpcAddress, powerGrpcPort)
.usePlaintext()
.keepAliveTime(KEEP_ALIVE_TIME, TimeUnit.SECONDS)
.keepAliveTimeout(KEEP_ALIVE_TIME_OUT, TimeUnit.SECONDS)
.build();
}
/**
* 同步阻塞的存根
* @param powerChannel
* @return
*/
@Bean
public PowerManagementServiceGrpc.PowerManagementServiceBlockingStub powerManagementServiceBlockingStub(ManagedChannel powerChannel) {
return PowerManagementServiceGrpc.newBlockingStub(powerChannel);
}
/**
* 异步非阻塞的存根
* @param powerChannel
* @return
*/
@Bean
public PowerManagementServiceGrpc.PowerManagementServiceStub powerManagementServiceStub(ManagedChannel powerChannel) {
return PowerManagementServiceGrpc.newStub(powerChannel);
}
}
RobotPowerClient.java
@Slf4j
@Component
public class RobotPowerClient {
@Autowired
PowerManagementServiceGrpc.PowerManagementServiceBlockingStub powerManagementServiceBlockingStub;
@Autowired
PowerManagementServiceGrpc.PowerManagementServiceStub asyncPowerManagementServiceStub;
/**
* 设置各设备的电源状态
* @param deviceSettings
* @return
*/
public boolean applyDeviceSettings(Power.DeviceSetting... deviceSettings){
log.debug("调用 grpc 接口设置设备状态参数:{}", deviceSettings);
Power.DevicePowerControlResponse response = powerManagementServiceBlockingStub
.withDeadlineAfter(30, TimeUnit.SECONDS)
.setDevicePowerState(Power.DevicePowerControlRequest.newBuilder()
.addAllSettings(Arrays.asList(deviceSettings))
.build()
);
log.debug("调用 grpc 接口设置设备状态结果:{}", response);
return response.getCode() == ErrorCode.OK.getCode();
}
/**
* 流式获取电源状态信息
* @param callback
*/
public void getAppLayerPowerStateStream(Consumer<Power.AppLayerPowerState> callback){
asyncPowerManagementServiceStub.getAppLayerPowerStateStream(Empty.newBuilder().build(), new StreamObserver<Power.AppLayerPowerState>() {
@Override
public void onNext(Power.AppLayerPowerState appLayerPowerState) {
// 调用传入的回调方法
if (callback != null) {
callback.accept(appLayerPowerState);
}
}
@Override
public void onError(Throwable throwable) {
log.error("流式获取电源状态信息异常", throwable);
retry();
}
@Override
public void onCompleted() {
log.error("服务端终止上送电源状态");
retry();
}
private void retry() {
log.info("尝试重新启动流式获取电源状态信息...");
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
log.error(e.getMessage(), e);
Thread.currentThread().interrupt();
}
getAppLayerPowerStateStream(callback);
}
});
}
}
4. 可能存在的问题
- 长连接无感知连接断开问题解决
该问题出现在 grpc 客户端调用,服务端流式返回。如果未设置超时时间,即使服务端网络断开,客户端仍无法感知。
解决方案是配置 keepalive,可以让客户端知道服务端不可达,从而进行重试或断开连接,详见 GrpcClientConfig.java
。
-
KEEP_ALIVE_TIME
:发送心跳的间隔,java 的 grpc 包中设置该值最小为 10 秒。 -
KEEP_ALIVE_TIME_OUT
:keepalive 超时时间,如果超过该时间,则认为连接不可达。(测试中发现该值最大为20s)
需要注意的是,keepalive 是在连接没有发送数据后发送心跳包,所以从断开到发现断开的时间为 KEEP_ALIVE_TIME
+ KEEP_ALIVE_TIME_OUT
。
- 超时时间设置
阻塞式grpc调用,需要配置超时时间,使用 withDeadlineAfter
配置超时时间,详见 RobotPowerClient.java
。
这里需要注意的是,需要在每次使用 stub 进行调用时,才需要配置超时时间。如果在 GrpcClientConfig.java
配置,则程序启动时间超过指定时间以后进行调用就会提示超时。
- gRPC连接断开后的处理
由于测试 grpc 的重试机制并没有生效,所以目前都是业务层配置重试兜底,详见 RobotPowerClient.java
。
- 网络断开再重连数据一次性收到所有返回
作为服务端来说,推送数据即使网络断开了,由于 grpc 基于 tcp,重试机制会一直尝试发送没有送达的数据包。所以当网络恢复,会有大量堆积的数据推送到客户端。
解决方案是服务端配置 keepalive,,一旦发现客户端不可达,立即关闭连接,等待客户端再次发起请求。
这里如果客户端发送 ping 包时间间隔太小,可能导致 UNAVAILABLE: Too many pings
报错,解决方案在 python 作为服务端的情况下是调整相关间隔。
def serve():
server = grpc.server(
futures.ThreadPoolExecutor(max_workers=10),
options=[
('grpc.keepalive_time_ms', 5000), # 允许每 5 秒发送一次 KeepAlive
('grpc.keepalive_timeout_ms', 5000), # 5 秒内未响应则认为连接断开
('grpc.keepalive_permit_without_calls', True), # 即使没有 RPC 也允许 KeepAlive
('grpc.http2.max_pings_without_data', 0), # 允许不带数据的 PING
('grpc.http2.min_time_between_pings_ms', 1000), # 允许的最小 PING 发送间隔
('grpc.http2.min_ping_interval_without_data_ms', 1000), # 无数据时 PING 最小间隔
]
)
#...