【Java】多发送端多文件网络传输(初版)
多文件多发送端网络传输:顾名思义,就是多个发送端通过网络通信同时给一个接收端发送文件。
为什么说多文件?就那一个Java项目来说,要完成项目发送,就需要发送许多.java或.class或者一些配置文件,这些文件常常相互依存。
如何实现?采用TCP协议
实现逻辑图如下:
对于文件接收方来说: 他需要接收来自多个发送方发送的文件,所以,要以接收方为服务器,让多发送方连接他。
对于文件发送方来说:他只要负责去发送他所需要发送的文件就好。
目录
UnreceivedBlockRecord 未接收的文件块记录
IResourceReceiveProgress 进度条接口
ResourceReceiveTopProgress 任务弹窗
文件发送,文件发送,当然要有“文件”,所以咱先从发送方和接收方都需要的公共类出发
基础类:
ReceiveFileModel 接收文件
对于要接收方要接收的文件,需要知晓其文件长度,以及文件需保存到的位置,还有保存未接收到的“文件块”信息。
为什么要分为相对路径和绝对路径呢?对于一个项目来说,其文件所在包(文件夹)是规定好的,这点就算发送到接收端也是不能改变的,这是相对路径。但,该项目保存的位置接收端和发送端可以不同,这是绝对路径。例如:发送端:E:\\sender\\Test\\test.java 接收端只需保证Test\\test,java一致即可
当然,因为这样,当ReceiveFileModel比较时,无须比较其绝对路径
package com.funyoo.fileTransmission.core;
import java.util.Objects;
/**
* 文件信息类<br>
* 文件长度
* 相对路径:文件路径
* 绝对路径:考虑到不同根目录
* 未接收部分
* @author funyoo
*/
public class ReceiveFileModel {
private long length; // 文件长度
private String filePath; // 相对路径
private String fileAbsolutePath; // 绝对路径
private UnreceivedBlockRecord unreceivedBlockRecord; // 未接收文件块
public ReceiveFileModel(long length, String fileAbsolutePath, String filePath) {
this.length = length;
this.filePath = filePath;
this.fileAbsolutePath = fileAbsolutePath;
unreceivedBlockRecord = new UnreceivedBlockRecord(length);
}
public boolean equals(String filePath) {
return this.filePath.equals(filePath);
}
/**
* 重写equals方法<br>
* 比较文件是否相等不必比较绝对路径
* @param o
* @return
*/
@Override
public boolean equals(Object o) {
if (this == o) return true;
System.out.println(getClass());
System.out.println("o.getClass : " + o.getClass());
if (o == null || getClass() != o.getClass()) return false;
ReceiveFileModel receiveFileModel = (ReceiveFileModel) o;
return length == receiveFileModel.length &&
Objects.equals(filePath, receiveFileModel.filePath);
}
@Override
public int hashCode() {
return Objects.hash(length, filePath);
}
String getFilePath() {
return filePath;
}
String getFileAbsolutePath() {
return fileAbsolutePath;
}
public long getLength() {
return length;
}
UnreceivedBlockRecord getUnreceivedBlockRecord() {
return unreceivedBlockRecord;
}
@Override
public String toString() {
return "ReceiveFileModel{" +
"length=" + length +
", filePath='" + filePath + '\'' +
", fileAbsolutePath='" + fileAbsolutePath + '\'' +
'}';
}
}
上面说到“文件块”,文件块就是:组成每个fileModel的“单位”。每个文件由多个“文件块”组成,对于文件收发来说,发送端一块一块发文件,接收端则一块一块收文件。
ResourceBlock 文件块
对于接收到的每一块文件块来说,我当然需要知道
哪个文件?用文件id来区分
哪块数据?用偏移量和长度定位
什么数据?字节数组
同时文件块还完成了保存文件的功能,即,收到一块保存一块
这里采用了线程池来保存文件块,防止频繁创建和销毁线程造成的资源浪费。
package com.funyoo.fileTransmission.core;
import com.funyoo.fileTransmission.view.IResourceReceiveProgress;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.util.concurrent.ThreadPoolExecutor;
/**
* 文件块:<br>
* @author funyoo
*/
public class ResourceBlock implements Runnable {
private int fileId; // 文件编号
private long offset; // 偏移量
private int length; // 文件块长度
private byte[] content; // 文件块
private ReceiveFileSet receiveFileSet; // 接收文件集合
private ThreadPoolExecutor threadPool; // 线程池
private IResourceReceiveProgress receiveProgress;
ResourceBlock(ReceiveFileSet receiveFileSet, ThreadPoolExecutor threadPool,
IResourceReceiveProgress receiveProgress) {
this.receiveFileSet = receiveFileSet;
this.threadPool = threadPool;
this.receiveProgress = receiveProgress;
}
int getFileId() {
return fileId;
}
ResourceBlock setFileId(int fileId) {
this.fileId = fileId;
return this;
}
long getOffset() {
return offset;
}
ResourceBlock setOffset(long offset) {
this.offset = offset;
return this;
}
long getLength() {
return length;
}
ResourceBlock setLength(int length) {
this.length = length;
return this;
}
ResourceBlock setContent(byte[] content) {
this.content = content;
return this;
}
/**
* 启动保存文件线程
*/
public void startWriteBlock() {
threadPool.execute(this);
}
/**
* 保存文件
* @param receiveFileModel
* @throws IOException
* @throws FileNoFunnd
*/
private void writeBlock(ReceiveFileModel receiveFileModel) throws IOException, FileNoFunnd {
if (receiveFileModel == null) {
throw new FileNoFunnd("文件号[" + fileId + "]不存在");
}
RandomAccessFile raf = new RandomAccessFile(
receiveFileModel.getFileAbsolutePath() + receiveFileModel.getFilePath(),
"rwd");
raf.seek(offset);
raf.write(content);
raf.close();
if (receiveProgress != null) {
receiveProgress.receiveOneBlock(fileId, content.length);
}
}
/**
* 保存文件线程
*/
@Override
public void run() {
ReceiveFileModel receiveFile = receiveFileSet.getFile(fileId);
try {
writeBlock(receiveFile);
receiveFile.getUnreceivedBlockRecord().receiveBlock(this);
} catch (IOException e) {
e.printStackTrace();
} catch (FileNoFunnd fileNoFunnd) {
fileNoFunnd.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
}
}
}
上个类提到了文件集合,对于多文件多发送端传输而言,接收端有其需要的接收文件信息的集合,发送端有其需要发送文件信息的的集合,这两个集合的相同文件需要通过公有的属性联系起来。
ReceiveFileSet 文件集合
这其中的map用来存储文件编号与文件的一一对应关系表
package com.funyoo.fileTransmission.core;
import java.util.*;
/**
* 接收文件集合<br>
* 该类存储着文件编号和文件的对应关系
* @author funyoo
*/
public class ReceiveFileSet {
private Map<Integer, ReceiveFileModel> fileMap;
private long totalReceiveBytes;
public ReceiveFileSet() {
fileMap = new HashMap<>();
}
long getTotalReceiveBytes() {
return totalReceiveBytes;
}
/**
* 通过文件编号获得文件
*
* @param fileId
* @return
*/
ReceiveFileModel getFile(int fileId) {
return fileMap.get(fileId);
}
/**
* 通过文件路径查找文件编号
*
* @param path
* @return
*/
int getFileIdByPath(String path) {
for (int fileId : fileMap.keySet()) {
ReceiveFileModel receiveFileModel = fileMap.get(fileId);
if (receiveFileModel.equals(path))
return fileId;
}
return -1;
}
/**
* 增加文件
*
* @param fileId
* @param receiveFileModel
*/
public void addReceiveFile(int fileId, ReceiveFileModel receiveFileModel) throws ReceiveFileAlreadyExist {
ReceiveFileModel file = fileMap.get(fileId);
if (file != null) {
throw new ReceiveFileAlreadyExist("文件编号[" + fileId + "]重复!");
}
totalReceiveBytes += receiveFileModel.getLength();
fileMap.put(fileId, receiveFileModel);
}
public int getTotalReceiveFiles() {
return fileMap.size();
}
}
当然,发送方发送的文件也需要定义
SendFileModel 发送文件
package com.funyoo.fileTransmission.core;
/**
* 发送的文件
* @author funyoo
*/
public class SendFileModel {
private String fileAbsolutePath; // 文件绝对路径
private String filePath; // 文件相对路径
private int length; // 发送长度
private long offset; // 发送偏移量
public SendFileModel(String fileAbsolutePath, String filePath, long offset, int length) {
this.fileAbsolutePath = fileAbsolutePath;
this.filePath = filePath;
this.length = length;
this.offset = offset;
}
SendFileModel() {
}
String getFileAbsolutePath() {
return fileAbsolutePath;
}
void setFileAbsolutePath(String fileAbsolutePath) {
this.fileAbsolutePath = fileAbsolutePath;
}
String getFilePath() {
return filePath;
}
void setFilePath(String filePath) {
this.filePath = filePath;
}
int getLength() {
return length;
}
void setLength(int length) {
this.length = length;
}
long getOffset() {
return offset;
}
void setOffset(long offset) {
this.offset = offset;
}
}
还有一点未提到,记不记得文件的传输是以文件块为单位传输的,细心的可以发现,每个接收文件类中都包含了一个未接受的文件块对象。我们如何表示文件块记录信息呢?当然是偏移量 + 长度
BlockRecord 文件块记录
package com.funyoo.fileTransmission.core;
/**
* 文件块记录
* @author funyoo
*/
public class BlockRecord {
private long offset;
private long length;
BlockRecord(long offset, long length) {
this.offset = offset;
this.length = length;
}
long getOffset() {
return offset;
}
void setOffset(long offset) {
this.offset = offset;
}
long getLength() {
return length;
}
void setLength(long length) {
this.length = length;
}
}
UnreceivedBlockRecord 未接收的文件块记录
每接收并一个文件块的时候都需要将未接受到的文件块记录列表更新
如何更新?请看代码后附图
package com.funyoo.fileTransmission.core;
import java.util.LinkedList;
import java.util.List;
/**
* 未接收到的文件块记录<br>
* 为以后实现的断点续传做准备
*/
public class UnreceivedBlockRecord {
private List<BlockRecord> blockList; // 未接受到的块列表
/**
* 根据文件长度初始化记录表的长度
* @param fileLength
*/
UnreceivedBlockRecord(long fileLength) {
blockList = new LinkedList<>();
blockList.add(new BlockRecord(0, fileLength));
}
/**
* 定位块位置
* @param curOffset
* @return
* @throws Exception
*/
private int getTheBlock(long curOffset) throws Exception {
for (int index = 0; index < blockList.size(); index++) {
BlockRecord record = blockList.get(index);
if (curOffset <= record.getOffset() + record.getLength()) {
return index;
}
}
throw new Exception("块编号未找到:" + curOffset);
}
/**
* 接收到文件块,更新列表操作
* @param curBlock
* @throws Exception
*/
synchronized void receiveBlock(ResourceBlock curBlock) throws Exception {
long curOffset = curBlock.getOffset();
long curLength = curBlock.getLength();
int orgBlockIndex = getTheBlock(curOffset);
BlockRecord orgBlock = blockList.get(orgBlockIndex);
long orgOffset = orgBlock.getOffset();
long orgLength = orgBlock.getLength();
long leftOffset = orgOffset;
long leftLength = curOffset - orgOffset;
long rightOffset = curOffset + curLength;
long rightLength = orgOffset + orgLength - rightOffset;
blockList.remove(orgBlock);
if (rightLength > 0)
blockList.add(orgBlockIndex, new BlockRecord(rightOffset, rightLength));
if (leftLength > 0)
blockList.add(orgBlockIndex, new BlockRecord(leftOffset, leftLength));
}
}
这个记录列表有什么用?为以后未实现的断点续传功能打基础。
至此,基础类就告一段落了。
下面从构建接收方服务器这边开始:
接收方:
ResourceReceiveServer 接收端服务器
接收端服务器需要做什么?1.侦听多发送端连接 2.启动服务器 3.关闭服务器
接收方每接收到一个发送端的连接请求,就会生成一个接收者ResourceReceiver与该发送端通信,待侦听到计划的所有的发送端,开始判断所有接收者是否都完成任务(接受 和 保存),这里进行接收完毕的操作(关闭线程池等)。
这里同时提供了进度条的选项,使用者可以使用默认进度条。
package com.funyoo.fileTransmission.core;
import com.funyoo.fileTransmission.view.IResourceReceiveProgress;
import javax.swing.*;
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.*;
/**
* 接收端服务器:<br>
* 接受文件方应建立服务器<br>
* 以接受多个发送端发送文件<br>
* @author funyoo
*/
public class ResourceReceiveServer implements Runnable {
private ServerSocket receiver; // 文件接收端
private int port; // 接收端端口
private Object lock; // 锁
private boolean continueWaittingSender; // 判断是否继续侦听标志
private ReceiveFileSet receiveFileSet; // 需要接收的文件集合
private ThreadPoolExecutor threadPool; // 线程池
private int senderCount;
private IResourceReceiveProgress receiveProgress;
public ResourceReceiveServer() {
this(0, null);
}
public ResourceReceiveServer(int port, IResourceReceiveProgress receiveProgress) {
threadPool = new ThreadPoolExecutor(50, 100,
500, TimeUnit.MICROSECONDS, new LinkedBlockingDeque<>());
this.port = port;
this.receiveProgress = receiveProgress;
}
public void setPort(int port) {
this.port = port;
}
public void setReceiveFileSet(ReceiveFileSet receiveFileSet) {
this.receiveFileSet = receiveFileSet;
if (receiveProgress != null) {
receiveProgress.setSenderPlan(
receiveFileSet.getTotalReceiveFiles(),
receiveFileSet.getTotalReceiveBytes());
}
}
public void setSenderCount(int senderCount) {
this.senderCount = senderCount;
}
/**
* 启动接收端服务器
*/
public void startup() {
try {
receiver = new ServerSocket(port);
continueWaittingSender = true;
Thread thread = new Thread(this,"Receiver");
thread.start();
// 确保侦听线程已经启动
synchronized (ResourceReceiveServer.class) {
ResourceReceiveServer.class.wait();
}
if (receiveProgress != null) {
receiveProgress.startShowProgress();
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 关闭接收端服务器
*/
private void shutdown() {
if (receiver != null && !receiver.isClosed()) {
try {
receiver.close();
} catch (IOException e) {
e.printStackTrace();
} finally {
receiver = null;
}
}
}
/**
* 文件传输线程
* 每侦听到一个发送端的连接,处理其与接受端的对话
*/
@Override
public void run() {
int currentSenderCount = 0;
synchronized (ResourceReceiveServer.class) {
ResourceReceiveServer.class.notify();
}
if (receiveProgress != null) {
boolean receiveProgressIsShow = false;
while (!receiveProgressIsShow) {
receiveProgressIsShow = ((JDialog) receiveProgress).isActive();
}
}
while (continueWaittingSender && currentSenderCount < senderCount) {
try {
Socket sender = receiver.accept();
if (receiveProgress != null) {
String senderInfo = sender
.getInetAddress()
.getHostName();
receiveProgress.acceptOneSender(senderInfo);
}
// 开始接收文件
new ResourceReceiver(sender, receiveFileSet, threadPool, receiveProgress);
currentSenderCount++;
} catch (IOException e) {
e.printStackTrace();
continueWaittingSender = false;
}
}
boolean threadIsAllFinished = threadPool.getActiveCount() > 0;
while (threadIsAllFinished) {
threadIsAllFinished = threadPool.getActiveCount() > 0;
}
threadPool.shutdown();
if (receiveProgress != null) {
receiveProgress.finishedReceive();
}
shutdown();
}
}
需要解释的是,在启动侦听线程的时候加了一个锁,该锁的目的是保证侦听线程的代码在启动服务器方法结束前执行。避免启动服务器后操作因为侦听代码未真正执行起来而出现错误。
进度条需外界设置,若为null,则不进行进度条呢的操作。
ResourceReceiver 接收者
这个类用来处理和其对应发送端的通信,其主要功能是接收文件和保存文件。
接收文件:先接收由发送端发来的16字节文件头部信息,解析信息,根据解析的文件信息,创建对应的长度的“容器”(block),接收对应长度的文件内容,并保存。
当接收到文件编号为-1时,停止接收。
保存文件:见ResourceBlock类
package com.funyoo.fileTransmission.core;
import com.funyoo.fileTransmission.util.ByteAndString;
import com.funyoo.fileTransmission.view.IResourceReceiveProgress;
import java.io.DataInputStream;
import java.io.IOException;
import java.net.Socket;
import java.util.concurrent.ThreadPoolExecutor;
/**
* 处理与发送端的对话<br>
* @author funyoo
*/
public class ResourceReceiver implements Runnable {
private Socket sender; // 与之通信的发送端
private DataInputStream dis; // 输入信道
private ReceiveFileSet receiveFileSet; // 需要接受的文件集合
private ThreadPoolExecutor threadPool; // 线程池
private volatile Object lock; // 锁
private IResourceReceiveProgress receiveProgress; // 接收进度条
static final ThreadLocal threadFileId
= new ThreadLocal<Integer>() {
@Override
protected Integer initialValue() {
return -1;
}
};
ResourceReceiver(Socket sender, ReceiveFileSet receiveFileSet,
ThreadPoolExecutor threadPool, IResourceReceiveProgress receiveProgress) {
this.sender = sender;
this.receiveFileSet = receiveFileSet;
this.threadPool = threadPool;
this.receiveProgress = receiveProgress;
lock = new Object();
try {
dis = new DataInputStream(sender.getInputStream());
threadPool.execute(this);
synchronized (lock) {
lock.wait();
}
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
/**
* 接收文件
*/
private boolean receiverOneBlock() throws IOException {
byte[] header = receiveBytes(16, -2);
// 解析字节信息
int fileId = ByteAndString.getIntAt(header, 0);
long offset = ByteAndString.getLongAt(header, 4);
int length = ByteAndString.getIntAt(header, 12);
if (fileId == -1) {
return true;
}
if (receiveProgress != null) {
int oldFileId = (int) threadFileId.get();
if (oldFileId != fileId) {
threadFileId.set(fileId);
ReceiveFileModel rfm = receiveFileSet.getFile(fileId);
String fileName = rfm.getFilePath();
int fileLength = (int) rfm.getLength();
receiveProgress.receiveNewFile(fileId, fileName, fileLength);
}
}
ResourceBlock block = new ResourceBlock(receiveFileSet, threadPool, receiveProgress);
// 接收文件内容
byte[] content = receiveBytes(length, fileId);
block.setFileId(fileId)
.setOffset(offset)
.setLength(length)
.setContent(content);
// 保存文件
block.startWriteBlock();
return false;
}
/**
* 接收文件内容
* @param length
* @return
*/
private byte[] receiveBytes(int length, int fileId) throws IOException {
byte[] content = new byte[length];
int offset = 0;
int realReceiveLength = 0;
while (length > 0) {
realReceiveLength = dis.read(content, offset, length);
length -= realReceiveLength;
offset += realReceiveLength;
if (receiveProgress != null && fileId != -2) {
receiveProgress.receiveOneBlock(fileId, realReceiveLength);
}
}
return content;
}
/**
* 接收文件线程
*/
@Override
public void run() {
boolean finished = false;
synchronized (lock) {
lock.notify();
}
while (!finished) {
// 接收文件
try {
finished = receiverOneBlock();
} catch (IOException e) {
e.printStackTrace();
stop();
}
}
}
/**
* 关闭
*/
private void stop() {
if (sender != null || !sender.isClosed()) {
try {
sender.close();
} catch (IOException e) {
e.printStackTrace();
} finally {
sender = null;
}
}
if (dis != null) {
try {
dis.close();
} catch (IOException e) {
e.printStackTrace();
} finally {
dis = null;
}
}
}
}
说到这里,不得不提到字节工具,发送端需要将自己的文件信息“编辑”成16字节,接收者需要解析这16字节。
ByteAndString 字节工具
package com.funyoo.fileTransmission.util;
/**
* 字节工具
* @author funyoo
*/
public class ByteAndString {
public static final String hex = "0123456789ABCDEF";
/**
* 转化速度
* @param speed
* @return
*/
public static String bytesToKMG(long speed) {
StringBuffer str = new StringBuffer();
if (speed < 1024) {
return String.valueOf(speed);
}
if (speed < (1 << 20)) {
str.append(speed >> 10)
.append('.')
.append(String.valueOf((speed & 0x03FF) + 1000).substring(1))
.append('K');
return str.toString();
}
if (speed < (1 << 30)) {
str.append(speed >> 20)
.append('.')
.append(String.valueOf(((speed >> 10) & 0x03FF) + 1000).substring(1))
.append('M');
return str.toString();
}
if (speed < (1 << 40)) {
str.append(speed >> 30)
.append('.')
.append(String.valueOf(((speed >> 20) & 0x03FF) + 1000).substring(1))
.append('G');
return str.toString();
}
return String.valueOf(speed);
}
/**
* 显示将byte数组
* @param buffer
* @return
*/
public static String toHex(byte[] buffer) {
StringBuffer str = new StringBuffer();
for (int i = 0; i < buffer.length; i++) {
byte bt = buffer[i];
str.append(i == 0? "" : " ")
.append(hex.charAt((bt >> 4) & 0x0F))
.append(hex.charAt(bt & 0x0F));
}
return str.toString();
}
/**
* 将int类型的值转换为byte数组
* @param buffer
* @param offset
* @param value
*/
public static void setIntAt(byte[] buffer, int offset, int value) {
buffer[offset + 0] = (byte) ((value >> 24) & 0x00FF);
buffer[offset + 1] = (byte) ((value >> 16) & 0x00FF);
buffer[offset + 2] = (byte) ((value >> 8) & 0x00FF);
buffer[offset + 3] = (byte) (value & 0x00FF);
}
/**
* 从byte数组中得到int类型的值
* @param buffer
* @param offset
* @return
*/
public static int getIntAt(byte[] buffer, int offset) {
int value = 0;
value |= (buffer[offset + 0] << 24) & 0xFF000000;
value |= (buffer[offset + 1] << 16) & 0x00FF0000;
value |= (buffer[offset + 2] << 8) & 0x0000FF00;
value |= (buffer[offset + 3] << 0) & 0x000000FF;
return value;
}
/**
* 将long类型的值转换为byte数组
* @param buffer
* @param offset
* @param value
*/
public static void setLongAt(byte[] buffer, int offset, long value) {
buffer[offset + 0] = (byte) ((value >> 56) & 0x000F);
buffer[offset + 1] = (byte) ((value >> 48) & 0x000F);
buffer[offset + 2] = (byte) ((value >> 40) & 0x000F);
buffer[offset + 3] = (byte) ((value >> 32) & 0x000F);
buffer[offset + 4] = (byte) ((value >> 24) & 0x000F);
buffer[offset + 5] = (byte) ((value >> 16) & 0x000F);
buffer[offset + 6] = (byte) ((value >> 8) & 0x000F);
buffer[offset + 7] = (byte) (value & 0x000F);
}
/**
* 从byte数组中得到long类型的值
* @param buffer
* @param offset
* @return
*/
public static long getLongAt(byte[] buffer, int offset) {
long value = 0;
value |= (buffer[offset + 0] << 56) & 0xFF00000000000000L;
value |= (buffer[offset + 1] << 48) & 0x00FF000000000000L;
value |= (buffer[offset + 2] << 40) & 0x0000FF0000000000L;
value |= (buffer[offset + 3] << 32) & 0x000000FF00000000L;
value |= (buffer[offset + 4] << 24) & 0x00000000FF000000L;
value |= (buffer[offset + 5] << 16) & 0x0000000000FF0000L;
value |= (buffer[offset + 6] << 8) & 0x000000000000FF00L;
value |= (buffer[offset + 7] << 0) & 0x00000000000000FFL;
return value;
}
}
发送方:
ResourceSender 发送者
package com.funyoo.fileTransmission.core;
import com.funyoo.fileTransmission.util.ByteAndString;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.RandomAccessFile;
import java.net.Socket;
import java.util.List;
/**
* 文件发送类<br>
* 负责与文件接收方通信
* @author funyoo
*/
public class ResourceSender implements Runnable {
private Socket sender; // 发送端
private ReceiveFileSet receiveFileSet; // 接收端需接收文件集合
private List<SendFileModel> sendFileList; // 发送端发送文件的列表
private DataOutputStream dos; // 发送信道
private Object lock; // 锁
public static final int BUFFER_SIZE = 1 << 15; // 每次发送的大小
/**
* 生成与接收端通信的发送端发送者<br>
* 并直接启动发送线程发送发送任务
* @param sender
* @param receiveFileSet
* @param sendFileList
* @throws IOException
*/
ResourceSender(Socket sender, ReceiveFileSet receiveFileSet,
List<SendFileModel> sendFileList) throws IOException {
this.sender = sender;
this.receiveFileSet = receiveFileSet;
this.sendFileList = sendFileList;
dos = new DataOutputStream(sender.getOutputStream());
new Thread(this, "Sender").start();
synchronized (ResourceSender.class) {
try {
ResourceSender.class.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
/**
* 发送线程<br>
* 对于每个文件,先发送包含其文件信息的文件头部
* 再发送文件本身
* 待所有文件都发送完毕,再发送一遍文件头(文件编号为-1)作为停止标志
*/
@Override
public void run() {
synchronized (ResourceSender.class) {
ResourceSender.class.notify();
}
byte[] header = new byte[16];
byte[] buffer = new byte[BUFFER_SIZE];
for (SendFileModel sendFile : sendFileList) {
String fileAbsolutePath = sendFile.getFileAbsolutePath();
String filePath = sendFile.getFilePath();
int fileId = receiveFileSet.getFileIdByPath(filePath);
long offset = sendFile.getOffset();
int length = sendFile.getLength();
System.out.println("发送文件 " + filePath + " " + "编号" + fileId);
ByteAndString.setIntAt(header, 0, fileId);
ByteAndString.setLongAt(header, 4, offset);
ByteAndString.setIntAt(header, 12, length);
try {
dos.write(header);
dos.flush();
RandomAccessFile raf = new RandomAccessFile(fileAbsolutePath + filePath, "r");
raf.seek(offset);
int realReadLen = 0;
int len = 0;
while (length > 0) {
realReadLen = length >= BUFFER_SIZE? BUFFER_SIZE : length;
len = raf.read(buffer, 0, realReadLen);
length -= len;
dos.write(buffer, 0, len);
}
raf.close();
} catch (IOException e) {
e.printStackTrace();
stop();
break;
}
}
ByteAndString.setIntAt(header, 0, -1);
if (dos != null) {
try {
dos.write(header);
dos.flush();
} catch (IOException e) {
e.printStackTrace();
}
}
stop();
}
/**
* 关闭,停止发送
*/
private void stop() {
if (sender != null || !sender.isClosed()) {
try {
sender.close();
} catch (IOException e) {
e.printStackTrace();
} finally {
sender = null;
}
}
if (dos != null) {
try {
dos.close();
} catch (IOException e) {
e.printStackTrace();
} finally {
dos = null;
}
}
}
}
以上是文件发送者,需要一个文件发送中心,用来整合发送发需要发送的文件,启动文件发送任务
ResourceSenderCenter 发送中心
package com.funyoo.fileTransmission.core;
import java.io.IOException;
import java.net.Socket;
import java.util.List;
/**
* 负责整合文件资源并启动文件发送任务
* @author funyoo
*/
public class ResourceSenderCenter {
private Socket sender; // 发送方
private int receiverPort; // 接收端端口
private String receiverIp; // 接收端Ip
private ReceiveFileSet sendFileSet; // 接收端需要接受的文件集合
private List<SendFileModel> sendFileList; // 此发送端发送的文件列表
public ResourceSenderCenter(String receiverIp, int receiverPort) {
this.receiverIp = receiverIp;
this.receiverPort = receiverPort;
}
public void setSendFileSet(ReceiveFileSet sendFileSet) {
this.sendFileSet = sendFileSet;
}
public void setSendFileList(List<SendFileModel> sendFileList) {
this.sendFileList = sendFileList;
}
/**
* 发送文件
* @throws IOException
*/
public void startSend() throws IOException {
if (sendFileSet == null) {
// 无文件发送
return;
}
sender = new Socket(receiverIp, receiverPort);
new ResourceSender(sender, sendFileSet, sendFileList);
}
}
进度条:
关于进度条,这个是使用者可选的操作。
IResourceReceiveProgress 进度条接口
package com.funyoo.fileTransmission.view;
import java.awt.*;
/**
* 接收端进度条接口
* @author funyoo
*/
public interface IResourceReceiveProgress {
Font topicFont = new Font("微软雅黑", Font.BOLD, 30);
Font normalFont = new Font("宋体", Font.PLAIN, 16);
Font importantFont = new Font("黑体", Font.BOLD, 16);
int normalFontSize = normalFont.getSize();
int topicFontSize = topicFont.getSize() + 4;
Color topicColor = new Color(6, 7, 9);
Color titleColor = new Color(42, 114, 167);
Color importantColor = new Color(23, 156, 41);
int RECEIVE_PROGRESS_WIDTH = 400;
int RECEIVE_PROGRESS_HEIGHT = 50;
int PROGRESS_MIN_HEIGHT = 320;
int PADDING = 5;
long MIN_TIME_FOR_CUR_SPEED = 250;
long MIN_TIME_FOR_TOT_SPEED = 500;
/**
* 设置发送计划
* @param receiveFileCount
* @param byteCount
*/
void setSenderPlan(int receiveFileCount, long byteCount);
/**
* 设置发送端列表
* @param senderCount
*/
void setSenderInfo(int senderCount);
/**
* 显示进度条
*/
void startShowProgress();
/**
* 接收到一个发送端
* @param sender
*/
void acceptOneSender(String sender);
/**
* 接收新文件
* @param fileId
* @param fileName
* @param fileLength
*/
void receiveNewFile(int fileId, String fileName, int fileLength);
/**
* 接收文件块
* @param fileId
* @param length
*/
void receiveOneBlock(int fileId, int length);
/**
* 接收完成
*/
void finishedReceive();
}
ResourceReceiveTopProgress 任务弹窗
package com.funyoo.fileTransmission.view;
import com.funyoo.fileTransmission.util.ByteAndString;
import javax.swing.*;
import java.awt.*;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
/**
* 任务弹窗<br>
* 实现进度条接口
* 设置进度条各更新方法
*/
public class ResourceReceiveTopProgress extends JDialog
implements IResourceReceiveProgress, Runnable{
private Map<Integer, FileReceiveProgressPanel> fileReceiveMap; // 存储进度条,每个文件编号对应一个进度条
private Container container;
private JLabel jlblTopic;
private JLabel jlblReceivePlanFile;
private JLabel jlblReceivePlanSender;
private JLabel jlblReceiveAction;
private JLabel jlblCurrSpeed;
private JLabel jlblTotalSpeed;
private FileReceiveProgressPanel frppSender; // 发送端进度条
private FileReceiveProgressPanel frppFiles; // 文件进度条
private volatile int receiveFileCount;
private volatile int currentReceiveFileCount;
private volatile long startTime;
private volatile long lastTime;
private volatile long lastReceiveBytes;
private volatile long currentReceiveBytes;
private volatile boolean goon;
private volatile Object lock;
/**
* 初始化接收窗口,准备接收
* @param owner
* @param topic
*/
public ResourceReceiveTopProgress(Frame owner, String topic) {
super(owner, topic, true);
fileReceiveMap = new ConcurrentHashMap<>();
lock = new Object();
container = getContentPane();
container.setLayout(new GridLayout(0, 1));
setSize(RECEIVE_PROGRESS_WIDTH, PROGRESS_MIN_HEIGHT);
setLocationRelativeTo(owner);
jlblTopic = new JLabel(topic, JLabel.CENTER);
jlblTopic.setFont(topicFont);
jlblTopic.setForeground(topicColor);
container.add(jlblTopic);
// 接收计划
JPanel jpnlReceivePlan = new JPanel(new GridLayout(3, 1));
container.add(jpnlReceivePlan);
JLabel jlblReceivePlanTitle = new JLabel("本次接收计划", JLabel.CENTER);
jlblReceivePlanTitle.setFont(normalFont);
jlblReceivePlanTitle.setForeground(titleColor);
jpnlReceivePlan.add(jlblReceivePlanTitle);
jlblReceivePlanFile = new JLabel("本次共接收F个文件,共B字节。", JLabel.LEFT);
jlblReceivePlanFile.setFont(normalFont);
jpnlReceivePlan.add(jlblReceivePlanFile);
jlblReceivePlanSender = new JLabel("共S发送端。", JLabel.LEFT);
jlblReceivePlanSender.setFont(normalFont);
jpnlReceivePlan.add(jlblReceivePlanSender);
// 发送端进度
frppSender = new FileReceiveProgressPanel("发送端:", "", 1);
container.add(frppSender);
// 当前进行的接收动作
jlblReceiveAction = new JLabel("尚未确定接收任务", 0);
jlblReceiveAction.setFont(importantFont);
jlblReceiveAction.setForeground(importantColor);
container.add(jlblReceiveAction);
// 文件接收进度
frppFiles = new FileReceiveProgressPanel("接收文件:", "0/1", 1);
container.add(frppFiles);
JPanel jpnlSpeed = new JPanel(new GridLayout(1, 2));
container.add(jpnlSpeed);
jlblCurrSpeed = new JLabel("字节/秒");
jlblCurrSpeed.setFont(normalFont);
jpnlSpeed.add(jlblCurrSpeed);
jlblTotalSpeed = new JLabel("字节/秒");
jlblTotalSpeed.setFont(normalFont);
jpnlSpeed.add(jlblTotalSpeed);
}
/**
* 接收到一个发送端
* @param sender
*/
@Override
public void acceptOneSender(String sender) {
if (startTime == 0) {
startTime = System.currentTimeMillis();
synchronized (lock) {
lock.notify();
}
}
frppSender.receiveOneDelta(1);
frppSender.setContext(frppSender.getContext() + " " + sender);
jlblReceiveAction.setText("接入一个发送者:" + sender);
}
/**
* 设置发送计划
* @param receiveFileCount
* @param byteCount
*/
@Override
public void setSenderPlan(int receiveFileCount, long byteCount) {
String planContext = jlblReceivePlanFile.getText();
planContext = planContext.replace("F", String.valueOf(receiveFileCount));
planContext = planContext.replace("B", String.valueOf(byteCount));
jlblReceivePlanFile.setText(planContext);
this.receiveFileCount = receiveFileCount;
this.currentReceiveFileCount = 0;
frppFiles.setContext(currentReceiveFileCount + "/" + receiveFileCount);
jlblReceiveAction.setText("已确定发送任务计划!");
}
/**
* 设置发送端数量
* @param senderCount
*/
@Override
public void setSenderInfo(int senderCount) {
String planContext = jlblReceivePlanSender.getText();
planContext = planContext.replace("S", String.valueOf(senderCount));
jlblReceivePlanSender.setText(planContext);
frppFiles.setMaxValue(receiveFileCount);
}
/**
* 接收新文件
* @param fileId
* @param fileName
* @param fileLength
*/
@Override
synchronized public void receiveNewFile(int fileId, String fileName, int fileLength) {
FileReceiveProgressPanel frpp = fileReceiveMap.get(fileId);
if (frpp == null) {
frpp = new FileReceiveProgressPanel("接收", fileName, fileLength);
fileReceiveMap.put(fileId, frpp);
setSize(RECEIVE_PROGRESS_WIDTH, getHeight() + RECEIVE_PROGRESS_HEIGHT);
container.add(frpp);
currentReceiveFileCount++;
frppFiles.setContext(currentReceiveFileCount + "/" + receiveFileCount);
frppFiles.receiveOneDelta(1);
}
}
/**
* 接收文件块
* @param fileId
* @param length
*/
@Override
synchronized public void receiveOneBlock(int fileId, int length) {
FileReceiveProgressPanel frpp = fileReceiveMap.get(fileId);
if (frpp != null && frpp.receiveOneDelta(length)) {
container.remove(frpp);
fileReceiveMap.remove(fileId, frpp);
setSize(RECEIVE_PROGRESS_WIDTH, getHeight() - RECEIVE_PROGRESS_HEIGHT);
}
getReceiveSpeed(length);
}
/**
* 计算并更新文件发送速度
* @param length
*/
private void getReceiveSpeed(int length) {
currentReceiveBytes += length;
long currentTime = System.currentTimeMillis();
if (lastTime == 0) {
lastTime = currentTime;
jlblReceiveAction.setText("开始接收文件……");
return;
}
// 计算接收速度
long deltaTime = currentTime - lastTime;
if (deltaTime > MIN_TIME_FOR_CUR_SPEED) {
// 计算间隔时间
long deltaByte = currentReceiveBytes - lastReceiveBytes;
long curSpeed = deltaByte * 1000 / deltaTime;
jlblCurrSpeed.setText("瞬时速度: "
+ ByteAndString.bytesToKMG(curSpeed)
+ "B/秒");
lastTime = currentTime;
lastReceiveBytes = currentReceiveBytes;
}
}
/**
* 完成接收,关闭弹出的进度条菜单
*/
@Override
public void finishedReceive() {
this.dispose();
goon = false;
}
/**
* 侦听到发送端,开始显示进度条,启动计算平均时间线程
*/
@Override
public void startShowProgress() {
synchronized (lock) {
try {
new Thread(this).start();
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
this.setVisible(true);
}
@Override
public void run() {
synchronized (lock) {
try {
lock.notify();
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
goon = true;
while (goon) {
synchronized (Class.class) {
try {
Class.class.wait(MIN_TIME_FOR_TOT_SPEED);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
long currentTime = System.currentTimeMillis();
long totalSpeed = currentReceiveBytes * 1000 /
(currentTime - startTime);
jlblTotalSpeed.setText("平均速度:"
+ ByteAndString.bytesToKMG(totalSpeed)
+ "B/秒");
}
}
}
FileReceiveProgressPanel 进度条
package com.funyoo.fileTransmission.view;
import javax.swing.*;
import java.awt.*;
/**
* 进度条样式
* @author funyoo
*/
public class FileReceiveProgressPanel extends JPanel {
private JLabel jlblContext; // 标签
private JProgressBar jpgbBar; // 进度条
private JLabel jlblFileNameCaption; // 文件名标签
private int count; // 进度条总大小
private int currentCount; // 当前大小
public FileReceiveProgressPanel(String caption, String context, int count) {
this.count = count;
this.currentCount = 0;
this.setLayout(new GridLayout(2, 1));
JPanel jpnlCaption = new JPanel();
add(jpnlCaption);
jlblFileNameCaption = new JLabel(caption);
jlblFileNameCaption.setFont(IResourceReceiveProgress.normalFont);
jpnlCaption.add(jlblFileNameCaption);
jlblContext = new JLabel(context);
jlblContext.setFont(IResourceReceiveProgress.normalFont);
jpnlCaption.add(jlblContext);
jpgbBar = new JProgressBar();
jpgbBar.setFont(IResourceReceiveProgress.normalFont);
jpgbBar.setMaximum(this.count);
jpgbBar.setValue(currentCount);
jpgbBar.setStringPainted(true);
add(jpgbBar);
}
void setMaxValue(int count) {
jpgbBar.setMaximum(count);
}
void setContext(String context) {
jlblContext.setText(context);
}
String getContext() {
return jlblContext.getText();
}
void setCaption(String caption) {
jlblFileNameCaption.setText(caption);
}
/**
* 更新进度条
* @param delta
* @return
*/
boolean receiveOneDelta(int delta) {
currentCount += delta;
jpgbBar.setValue(currentCount);
return currentCount >= count;
}
}
测试:
package com.funyoo.fileTransmission.demo;
import com.funyoo.fileTransmission.core.ReceiveFileModel;
import com.funyoo.fileTransmission.core.ReceiveFileSet;
import com.funyoo.fileTransmission.core.ResourceReceiveServer;
import com.funyoo.fileTransmission.view.IResourceReceiveProgress;
import com.funyoo.fileTransmission.view.ResourceReceiveTopProgress;
import javax.swing.*;
import java.awt.*;
import java.awt.event.ActionEvent;
import java.awt.event.ActionListener;
/**
* 使用者使用样例<br>
* 定义窗口事件,点击开始接收文件需要的文件
*/
public class TestReceiveProgress {
private JFrame jfrmMainView;
private Container container;
private JButton jbtnOk;
private ReceiveFileSet fileSet;
public TestReceiveProgress() {
init();
dealAction();
}
void init() {
jfrmMainView = new JFrame("文件传输");
jfrmMainView.setSize(500, 400);
jfrmMainView.setLocationRelativeTo(null);
jfrmMainView.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
container = jfrmMainView.getContentPane();
container.setLayout(null);
jbtnOk = new JButton("开始");
jbtnOk.setFont(new Font("宋体", Font.PLAIN, 14));
jbtnOk.setBounds(200, 300, 65, 40);
container.add(jbtnOk);
// 需要接收的文件集合即ReceiveFileSet需要由需求方统一提供
// 经过中间服务器分配,或,负载均衡等策略发放给各发送端
String absolutePath = "F:\\";
try {
String filePath = "Tomb Raider 2018_8_21 17_09_00.mp4";
int length = 77314520;
fileSet = new ReceiveFileSet();
ReceiveFileModel file = new ReceiveFileModel(
length, absolutePath, filePath);
fileSet.addReceiveFile(1, file);
} catch (Exception e) {
e.printStackTrace();
}
}
void dealAction() {
jbtnOk.addActionListener(new ActionListener() {
@Override
public void actionPerformed(ActionEvent e) {
IResourceReceiveProgress rrp = new ResourceReceiveTopProgress(
jfrmMainView, " 接收文件 ");
ResourceReceiveServer rrs
= new ResourceReceiveServer(54147, rrp);
rrs.setReceiveFileSet(fileSet);
rrs.setSenderCount(1);
rrs.startup();
}
});
}
public void showView() {
jfrmMainView.setVisible(true);
}
void closeView() {
jfrmMainView.dispose();
}
}
package com.funyoo.fileTransmission.demo;
public class TestReceiver {
public static void main(String[] args) {
new TestReceiveProgress().showView();
}
}
package com.funyoo.fileTransmission.demo;
import com.funyoo.fileTransmission.core.*;
import java.io.IOException;
import java.util.LinkedList;
import java.util.List;
public class TestSender {
public static void main(String[] args){
ResourceSenderCenter senderCenter = new ResourceSenderCenter("localhost", 54147);
// 这里ReceiveFileSet需要从中间服务器接收,即,该发送端的发送任务
ReceiveFileSet sendFileSet = new ReceiveFileSet();
ReceiveFileModel receiveFile = new ReceiveFileModel(77314520, "C:\\Users\\funyoo\\Videos\\Captures\\",
"Tomb Raider 2018_8_21 17_09_00.mp4");
try {
sendFileSet.addReceiveFile(1, receiveFile);
} catch (ReceiveFileAlreadyExist receiveFileAlreadyExist) {
receiveFileAlreadyExist.printStackTrace();
}
senderCenter.setSendFileSet(sendFileSet);
// 根据发送任务寻找文件,组成list发送
SendFileModel file = new SendFileModel("C:\\Users\\funyoo\\Videos\\Captures\\",
"Tomb Raider 2018_8_21 17_09_00.mp4", 0, 77314520);
List<SendFileModel> sendFileList = new LinkedList<>();
sendFileList.add(file);
senderCenter.setSendFileList(sendFileList);
try {
senderCenter.startSend();
} catch (IOException e) {
e.printStackTrace();
}
}
}
总结:
以上代码单单完成了文件传输功能。
做了断点续传的准备工作。
还应采用中间服务器分配及下达任务,以完成负载均衡等策略。