|
|
@@ -0,0 +1,135 @@
|
|
|
+package cn.vbdsm.hj212.socket.transport;
|
|
|
+
|
|
|
+import cn.vbdsm.hj212.socket.config.ClientProperties;
|
|
|
+import cn.vbdsm.hj212.socket.driver.IMessage;
|
|
|
+import cn.vbdsm.hj212.socket.driver.OneByOneMessage;
|
|
|
+import cn.vbdsm.hj212.socket.exception.RequestExecuteException;
|
|
|
+import io.netty.bootstrap.Bootstrap;
|
|
|
+import io.netty.buffer.PooledByteBufAllocator;
|
|
|
+import io.netty.channel.Channel;
|
|
|
+import io.netty.channel.ChannelFuture;
|
|
|
+import io.netty.channel.ChannelInitializer;
|
|
|
+import io.netty.channel.ChannelOption;
|
|
|
+import io.netty.channel.nio.NioEventLoopGroup;
|
|
|
+import io.netty.channel.socket.nio.NioSocketChannel;
|
|
|
+import io.netty.util.concurrent.DefaultThreadFactory;
|
|
|
+import lombok.Getter;
|
|
|
+import lombok.extern.slf4j.Slf4j;
|
|
|
+
|
|
|
+import java.util.concurrent.TimeUnit;
|
|
|
+
|
|
|
+@Slf4j
|
|
|
+public class OneByOneClient {
|
|
|
+ private static final int DEFAULT_IO_THREADS = Math.min(Runtime.getRuntime().availableProcessors() + 1, 32);
|
|
|
+ private static final NioEventLoopGroup nioEventLoopGroup = new NioEventLoopGroup(DEFAULT_IO_THREADS,
|
|
|
+ new DefaultThreadFactory("NettyClientWorker", true));
|
|
|
+ private final ClientProperties properties;
|
|
|
+ private final ICodec codec;
|
|
|
+ @Getter
|
|
|
+ private Bootstrap bootstrap;
|
|
|
+ @Getter
|
|
|
+ private Channel channel;
|
|
|
+
|
|
|
+ public OneByOneClient(ICodec codec, String remoteIp, int remotePort) {
|
|
|
+ this.properties = ClientProperties.defaultProperties(remoteIp, remotePort);
|
|
|
+ this.codec = codec;
|
|
|
+ }
|
|
|
+
|
|
|
+ public OneByOneClient(ClientProperties properties, ICodec codec) {
|
|
|
+ this.properties = properties;
|
|
|
+ this.codec = codec;
|
|
|
+ }
|
|
|
+
|
|
|
+ private boolean run() {
|
|
|
+ bootstrap = new Bootstrap();
|
|
|
+ bootstrap.group(nioEventLoopGroup)
|
|
|
+ .option(ChannelOption.SO_KEEPALIVE, true)
|
|
|
+ .option(ChannelOption.TCP_NODELAY, true)
|
|
|
+ .option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
|
|
|
+ .channel(NioSocketChannel.class);
|
|
|
+
|
|
|
+ if (properties.getTimeout() < 3000) {
|
|
|
+ bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 3000);
|
|
|
+ } else {
|
|
|
+ bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, properties.getTimeout());
|
|
|
+ }
|
|
|
+
|
|
|
+ OneByOneNettyClientHandler clientHandler = new OneByOneNettyClientHandler();
|
|
|
+
|
|
|
+ bootstrap.handler(new ChannelInitializer<NioSocketChannel>() {
|
|
|
+ @Override
|
|
|
+ protected void initChannel(NioSocketChannel ch) throws Exception {
|
|
|
+ NettyCodecAdapter adapter = new NettyCodecAdapter(codec);
|
|
|
+ ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug
|
|
|
+ .addLast("decoder", adapter.getInternalDecoder())
|
|
|
+ .addLast("encoder", adapter.getInternalEncoder())
|
|
|
+ .addLast("handler", clientHandler);
|
|
|
+ }
|
|
|
+ });
|
|
|
+
|
|
|
+ // bind
|
|
|
+ ChannelFuture channelFuture = null;
|
|
|
+ if (!"0.0.0.0".equals(properties.getBindIp()) && properties.getBindPort() != -1) {
|
|
|
+ bootstrap.bind(properties.getBindIp(), properties.getBindPort());
|
|
|
+ }
|
|
|
+ channelFuture = bootstrap.connect(properties.getRemoteIp(), properties.getRemotePort());
|
|
|
+ log.info("client starting ....");
|
|
|
+ channelFuture.awaitUninterruptibly(properties.getTimeout(), TimeUnit.MILLISECONDS);
|
|
|
+ if (channelFuture.isSuccess() && channelFuture.cause() == null) {
|
|
|
+ channel = channelFuture.channel();
|
|
|
+ log.info("client has started, remote:" + properties.getRemoteIp() + "," + "port:" + properties.getRemotePort());
|
|
|
+ return true;
|
|
|
+ }
|
|
|
+ channelFuture.cancel(true);
|
|
|
+ if (channelFuture.channel() != null) {
|
|
|
+ channelFuture.channel().close().syncUninterruptibly();
|
|
|
+ }
|
|
|
+ return false;
|
|
|
+ }
|
|
|
+
|
|
|
+ public boolean test() {
|
|
|
+ try {
|
|
|
+ boolean flag = run();
|
|
|
+ return flag;
|
|
|
+ } catch (Exception ex) {
|
|
|
+ log.error("connect error !", ex);
|
|
|
+ return false;
|
|
|
+ } finally {
|
|
|
+ stop();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ public IMessage sendMessage(IMessage request) {
|
|
|
+ try {
|
|
|
+ if (run()) {
|
|
|
+ if (channel != null && channel.isActive()) {
|
|
|
+ ChannelFuture channelFuture = channel.writeAndFlush(request).syncUninterruptibly();
|
|
|
+ if (channelFuture.cause() == null) {
|
|
|
+ OneByOneMessage response = new OneByOneMessage();
|
|
|
+ channel.attr(OneByOneNettyClientHandler.key).set(response);
|
|
|
+ response.getLatch().await(properties.getReadoutTimeout(), TimeUnit.MILLISECONDS);
|
|
|
+ if (response.getResponse() == null) {
|
|
|
+ throw new RequestExecuteException("请求超时");
|
|
|
+ }
|
|
|
+ return response.getResponse();
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+ throw new RequestExecuteException("无法打开连接");
|
|
|
+ } catch (Exception ex) {
|
|
|
+ log.error("send request error", ex);
|
|
|
+ if (ex instanceof RequestExecuteException) {
|
|
|
+ throw (RequestExecuteException) ex;
|
|
|
+ }
|
|
|
+ throw new RequestExecuteException("request error", ex);
|
|
|
+ } finally {
|
|
|
+ stop();
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ private void stop() {
|
|
|
+ if (channel != null && channel.isActive()) {
|
|
|
+ channel.close().awaitUninterruptibly();
|
|
|
+ }
|
|
|
+ }
|
|
|
+}
|