您当前的位置: 首页 生活 > > 正文

Java Websocket 02: 原生模式通过 Websocket 传输文件 世界速看料

2023-06-19 09:27:19 来源:博客园 分享到:
目录Java Websocket 01: 原生模式 Websocket 基础通信Java Websocket 02: 原生模式通过 Websocket 传输文件Websocket 原生模式 传输文件

关于 Websocket 传输的消息类型, 允许的参数包括以下三类

以下类型之一, 同时只能出现一个文本类型 (text messages) 的消息: String, Java primitive, 阻塞的 Stream Reader, 带text decoder(Decoder.Text or Decoder.TextStream)的对象二进制类型 (binary messages) 的消息: byte[] 或 ByteBuffer, 阻塞的 InputStream, 带 binary decoder (Decoder.Binary or Decoder.BinaryStream)的对象Pong messages: PongMessage通过 PathParam 指定的0个或多个基础类型会话参数 Session, 可选

因此对于不同的消息类型, 可以有不同参数类型的 onMessage() 方法, 分别用于处理不同格式的内容, 对于传输文件, 需要使用 ByteBuffer 类型的参数

void onMessage(ByteBuffer byteBuffer, Session session)

在处理过程中和普通的文件传输是一样的, 需要将文件分片传输, 并约定合适的消息头用于判断文件传输的阶段, 在服务端根据不同的阶段进行文件创建, 写入和结束.


【资料图】

演示项目

与前一篇项目结构相同, 只需要修改 SocketServer 和 SocketClient

完整示例代码: https://github.com/MiltonLai/websocket-demos/tree/main/ws-demo02

SocketServer.java

增加了 onMessage(ByteBuffer byteBuffer, Session session)方法用于处理二进制消息, 在方法中

先读取第一个字节的值, 根据不同的值对应不同的操作1 表示文件传输前的准备3 表示文件内容写入5 表示文件结束再读取后续的值1 解析出文件元信息, 并创建文件通道3 将内容写入文件5 关闭文件通道, 清除buffer回传ACK1 ACK 23 不ACK5 ACK 6
@Component@ServerEndpoint("/websocket/server/{sessionId}")public class SocketServer {    //...    @OnMessage    public void onMessage(ByteBuffer byteBuffer, Session session) throws IOException {        if (byteBuffer.limit() == 0) {            return;        }        byte mark = byteBuffer.get(0);        if (mark == 1) {            log.info("mark 1");            byteBuffer.get();            String info = new String(                    byteBuffer.array(),                    byteBuffer.position(),                    byteBuffer.limit() - byteBuffer.position());            FileInfo fileInfo = new JsonMapper().readValue(info, FileInfo.class);            byteChannel = Files.newByteChannel(                    Path.of("D:/data/" + fileInfo.getFileName()),                    new StandardOpenOption[]{StandardOpenOption.CREATE, StandardOpenOption.WRITE});            //ack            ByteBuffer buffer = ByteBuffer.allocate(4096);            buffer.put((byte) 2);            buffer.put("receive fileinfo".getBytes(StandardCharsets.UTF_8));            buffer.flip();            session.getBasicRemote().sendBinary(buffer);        } else if (mark == 3) {            log.info("mark 3");            byteBuffer.get();            byteChannel.write(byteBuffer);        } else if (mark == 5) {            log.info("mark 5");            //ack            ByteBuffer buffer = ByteBuffer.allocate(4096);            buffer.clear();            buffer.put((byte) 6);            buffer.put("receive end".getBytes(StandardCharsets.UTF_8));            buffer.flip();            session.getBasicRemote().sendBinary(buffer);            byteChannel.close();            byteChannel = null;        }    }    //...    public static class FileInfo implements Serializable {        private String fileName;        private long fileSize;        public String getFileName() {return fileName;}        public void setFileName(String fileName) {this.fileName = fileName;}        public long getFileSize() {return fileSize;}        public void setFileSize(long fileSize) {this.fileSize = fileSize;}    }}
SocketClient.java

client 测试类, 连接后可以在命令行向 server 发送消息

首先是消息处理中增加了 void onMessage(ByteBuffer bytes), 这个是用来接收服务端回传的ACK的, 根据第一个字节, 判断服务端的处理结果. 这里使用了一个 condition.notify()用来通知发送线程继续发送

其次是消息发送中, 用输入的1触发文件发送. 文件发送在 void sendFile(WebSocketClient webSocketClient, Object condition)方法中进行, 通过一个 condition 对象, 在文件开始传输和结束传输时控制线程的暂停和继续. byteBuffer.flip()用于控制 byteBuffer 从状态变为状态, 用于发送. flip is used to flip the ByteBuffer from "reading from I/O" (putting) to "writing to I/O" (getting).

public class SocketClient {    private static final Logger log = LoggerFactory.getLogger(SocketClient.class);    public static void main(String[] args) throws URISyntaxException, IOException, InterruptedException {        Object condition = new Object();        WebSocketClient wsClient = new WebSocketClient(new URI("ws://127.0.0.1:8763/websocket/server/10001")) {            //...            @Override            public void onMessage(ByteBuffer bytes) {                //To overwrite                byte mark = bytes.get(0);                if (mark == 2) {                    synchronized (condition) {                        condition.notify();                    }                    log.info("receive ack for file info");                } else if (mark == 6){                    synchronized (condition) {                        condition.notify();                    }                    log.info("receive ack for file end");                }            }            @Override            public void onClose(int i, String s, boolean b) {                log.info("On close: {}, {}, {}", i, s, b);            }            @Override            public void onError(Exception e) {                log.error("On error: {}", e.getMessage());            }        };        wsClient.connect();        log.info("Connecting ...");        while (!ReadyState.OPEN.equals(wsClient.getReadyState())) {        }        log.info("Connected");        Scanner scanner = new Scanner(System.in);        while (scanner.hasNext()) {            String line = scanner.next();            if ("1".equals(line))                sendFile(wsClient, condition);            else                wsClient.send(line);        }    }    public static void sendFile(WebSocketClient webSocketClient, Object condition){        new Thread(() -> {            try {                SeekableByteChannel byteChannel = Files.newByteChannel(                        Path.of("/home/milton/Backup/linux/apache-tomcat-8.5.58.tar.gz"),                        new StandardOpenOption[]{StandardOpenOption.READ});                ByteBuffer byteBuffer = ByteBuffer.allocate(4*1024);                byteBuffer.put((byte)1);                String info = "{\"fileName\": \"greproto.tar.gz\", \"fileSize\":"+byteChannel.size()+"}";                byteBuffer.put(info.getBytes(StandardCharsets.UTF_8));                byteBuffer.flip();                webSocketClient.send(byteBuffer);                synchronized (condition) {                    condition.wait();                }                byteBuffer.clear();                byteBuffer.put((byte)3);                while (byteChannel.read(byteBuffer) > 0) {                    byteBuffer.flip();                    webSocketClient.send(byteBuffer);                    byteBuffer.clear();                    byteBuffer.put((byte)3);                }                byteBuffer.clear();                byteBuffer.put((byte)5);                byteBuffer.put("end".getBytes(StandardCharsets.UTF_8));                byteBuffer.flip();                webSocketClient.send(byteBuffer);                synchronized (condition) {                    condition.wait();                }                byteChannel.close();            } catch (InterruptedException|IOException e) {                log.error(e.getMessage(), e);            }        }).start();    }}
运行示例

示例是一个普通的 Spring Boot jar项目, 可以通过mvn clean package进行编译, 再通过java -jar ws-demo01.jar运行, 启动后工作在8763端口

将 SocketClient.java 中的文件路径 D:/WorkJava/tmp/greproto.tar.gz换成自己本地的文件路径, 运行 SocketClient, 可以观察到服务端接收到的消息. 如果输入1并回车, 就会触发客户端往服务端传输文件

参考https://stackoverflow.com/questions/14792968/what-is-the-purpose-of-bytebuffers-flip-method-and-why-is-it-called-flip

关键词:

x 广告

Copyright   2015-2022 全球超市网版权所有  备案号:豫ICP备20009784号-11   联系邮箱:85 18 07 48 3@qq.com