以下是实现大文件上传的详细解析,包括分片上传、断点续传等功能:
- 前端核心实现:
// 文件分片上传类
class FileUploader {
constructor(file, options = {}) {
this.file = file
this.chunkSize = options.chunkSize || 2 * 1024 * 1024 // 2MB
this.chunkCount = Math.ceil(file.size / this.chunkSize)
this.currentChunk = 0
this.requestList = [] // 请求列表
this.hashProgress = 0
this.uploadProgress = 0
}
// 计算文件hash
async calculateHash() {
return new Promise((resolve) => {
const spark = new SparkMD5.ArrayBuffer()
const reader = new FileReader()
const chunks = this.createFileChunks()
let currentChunk = 0
reader.onload = (e) => {
spark.append(e.target.result)
currentChunk++
this.hashProgress = Math.floor((currentChunk / chunks.length) * 100)
if (currentChunk < chunks.length) {
loadNext()
} else {
resolve(spark.end())
}
}
const loadNext = () => {
reader.readAsArrayBuffer(chunks[currentChunk].file)
}
loadNext()
})
}
// 创建文件分片
createFileChunks() {
const chunks = []
let cur = 0
while (cur < this.file.size) {
chunks.push({
file: this.file.slice(cur, cur + this.chunkSize)
})
cur += this.chunkSize
}
return chunks
}
// 上传分片
async uploadChunks(chunks, hash) {
const requests = chunks.map((chunk, index) => {
const formData = new FormData()
formData.append('chunk', chunk.file)
formData.append('hash', hash)
formData.append('chunkIndex', index)
formData.append('filename', this.file.name)
return axios.post('/upload', formData, {
onUploadProgress: (e) => {
chunk.progress = parseInt((e.loaded / e.total) * 100)
this.updateTotalProgress(chunks)
}
})
})
this.requestList = requests
return Promise.all(requests)
}
// 暂停上传
pause() {
this.requestList.forEach(xhr => xhr.abort())
this.requestList = []
}
// 恢复上传
async resume(chunks, hash) {
// 获取已上传的切片信息
const { data: { uploadedList } } = await axios.get(`/verify/${hash}`)
// 过滤已上传的切片
const remainingChunks = chunks.filter((chunk, index) =>
!uploadedList.includes(index)
)
// 上传剩余切片
return this.uploadChunks(remainingChunks, hash)
}
// 更新总进度
updateTotalProgress(chunks) {
const loaded = chunks.map(chunk => chunk.progress || 0)
.reduce((acc, cur) => acc + cur)
this.uploadProgress = Math.floor(loaded / chunks.length)
}
// 合并请求
async mergeRequest(hash) {
return axios.post('/merge', {
hash,
filename: this.file.name,
size: this.file.size
})
}
// 完整的上传流程
async upload() {
const chunks = this.createFileChunks()
const hash = await this.calculateHash()
// 验证是否已上传
const { data: { uploaded, uploadedList } } = await axios.get(`/verify/${hash}`)
if (uploaded) {
return
}
// 上传分片
await this.uploadChunks(chunks, hash)
// 合并分片
await this.mergeRequest(hash)
}
}
- 使用示例:
<template>
<div>
<input type="file" @change="handleFileChange">
<button @click="handleUpload">上传</button>
<button @click="handlePause">暂停</button>
<button @click="handleResume">继续</button>
<div class="progress">
<div>计算hash进度:{{ hashProgress }}%</div>
<div>上传进度:{{ uploadProgress }}%</div>
</div>
</div>
</template>
<script setup>
import { ref } from 'vue'
import { FileUploader } from './uploader'
const fileUploader = ref(null)
const hashProgress = ref(0)
const uploadProgress = ref(0)
const handleFileChange = (e) => {
const file = e.target.files[0]
fileUploader.value = new FileUploader(file)
}
const handleUpload = async () => {
if (!fileUploader.value) return
try {
await fileUploader.value.upload()
} catch (error) {
console.error(error)
}
}
const handlePause = () => {
fileUploader.value?.pause()
}
const handleResume = async () => {
if (!fileUploader.value) return
await fileUploader.value.resume()
}
</script>
- 后端实现(Node.js):
const express = require('express')
const multiparty = require('multiparty')
const fse = require('fs-extra')
const path = require('path')
const app = express()
const UPLOAD_DIR = path.resolve(__dirname, 'uploads')
// 处理分片上传
app.post('/upload', async (req, res) => {
const form = new multiparty.Form()
form.parse(req, async (err, fields, files) => {
if (err) {
return res.status(500).end()
}
const [chunk] = files.chunk
const [hash] = fields.hash
const [chunkIndex] = fields.chunkIndex
const [filename] = fields.filename
const chunkDir = path.resolve(UPLOAD_DIR, hash)
// 确保文件夹存在
if (!fse.existsSync(chunkDir)) {
await fse.mkdirs(chunkDir)
}
// 移动分片到对应文件夹
await fse.move(chunk.path, path.resolve(chunkDir, chunkIndex))
res.end('received file chunk')
})
})
// 合并分片
app.post('/merge', async (req, res) => {
const { hash, filename, size } = req.body
const filePath = path.resolve(UPLOAD_DIR, `${hash}${path.extname(filename)}`)
const chunkDir = path.resolve(UPLOAD_DIR, hash)
const chunks = await fse.readdir(chunkDir)
// 按序号排序
chunks.sort((a, b) => a - b)
// 合并文件
await Promise.all(
chunks.map((chunk, index) => {
return new Promise((resolve) => {
const chunkPath = path.resolve(chunkDir, chunk)
const ws = fse.createWriteStream(filePath, {
start: index * size,
flags: 'a'
})
const rs = fse.createReadStream(chunkPath)
rs.pipe(ws)
rs.on('end', resolve)
})
})
)
// 清理分片
fse.removeSync(chunkDir)
res.end('merge success')
})
// 验证文件是否已上传
app.get('/verify/:hash', async (req, res) => {
const { hash } = req.params
const chunkDir = path.resolve(UPLOAD_DIR, hash)
if (fse.existsSync(chunkDir)) {
// 返回已上传的分片列表
const uploadedList = await fse.readdir(chunkDir)
res.json({
uploaded: false,
uploadedList: uploadedList
})
} else {
res.json({
uploaded: false,
uploadedList: []
})
}
})
- 优化建议:
// 1. 使用Web Worker计算hash
// hash.worker.js
self.importScripts('spark-md5.min.js')
self.onmessage = async (e) => {
const { chunks } = e.data
const spark = new SparkMD5.ArrayBuffer()
let count = 0
const loadNext = async (index) => {
const reader = new FileReader()
reader.readAsArrayBuffer(chunks[index].file)
reader.onload = (e) => {
count++
spark.append(e.target.result)
self.postMessage({
progress: parseInt((count / chunks.length) * 100)
})
if (count === chunks.length) {
self.postMessage({
hash: spark.end()
})
} else {
loadNext(count)
}
}
}
loadNext(0)
}
// 2. 并发控制
function createRequest(chunks, hash, limit = 3) {
const pool = [] // 并发池
let counter = 0 // 完成的请求数
return new Promise((resolve, reject) => {
const handleRequest = () => {
const task = chunks.shift()
if (!task) return
const promise = uploadChunk(task, hash)
pool.push(promise)
promise.finally(() => {
counter++
pool.splice(pool.indexOf(promise), 1)
if (counter === chunks.length) {
resolve()
} else {
handleRequest()
}
})
}
while (pool.length < limit && chunks.length > 0) {
handleRequest()
}
})
}
// 3. 秒传功能
async function checkFileExist(hash) {
const { data } = await axios.get(`/verify/${hash}`)
return data.uploaded
}
// 4. 错误重试
async function uploadChunkWithRetry(chunk, hash, retries = 3) {
let attempts = 0
while (attempts < retries) {
try {
return await uploadChunk(chunk, hash)
} catch (error) {
attempts++
if (attempts === retries) throw error
await new Promise(resolve => setTimeout(resolve, 1000 * attempts))
}
}
}
- 完整的错误处理:
class UploadError extends Error {
constructor(message, chunk) {
super(message)
this.chunk = chunk
}
}
async function upload() {
try {
// 文件格式验证
if (!this.validateFile()) {
throw new Error('不支持的文件格式')
}
// 文件大小验证
if (!this.validateSize()) {
throw new Error('文件超过最大限制')
}
const hash = await this.calculateHash()
// 秒传验证
const exists = await this.checkFileExist(hash)
if (exists) {
return { success: true, message: '秒传成功' }
}
// 上传分片
const chunks = this.createFileChunks()
await this.uploadChunks(chunks, hash)
// 合并分片
await this.mergeRequest(hash)
} catch (error) {
if (error instanceof UploadError) {
// 处理特定分片的错误
console.error(`Chunk ${error.chunk} upload failed:`, error.message)
} else {
console.error('Upload failed:', error.message)
}
throw error
}
}
这个实现包含了大文件上传的主要功能,包括:
- 文件分片
- 断点续传
- 暂停/恢复
- 进度监控
- 并发控制
- 错误处理
- 秒传功能
- hash计算
- 文件验证
评论区