parent
0231d6d30d
commit
ea5c1c1d08
@ -0,0 +1,148 @@ |
||||
package com.jianshui.api.apiutils; |
||||
|
||||
import cn.hutool.core.collection.ListUtil; |
||||
import cn.hutool.core.date.DateUtil; |
||||
import cn.hutool.core.io.FileUtil; |
||||
import cn.hutool.core.io.IoUtil; |
||||
import cn.hutool.core.util.ObjectUtil; |
||||
import cn.hutool.core.util.StrUtil; |
||||
import cn.hutool.http.HttpRequest; |
||||
import cn.hutool.http.HttpResponse; |
||||
import cn.hutool.poi.excel.ExcelReader; |
||||
import cn.hutool.poi.excel.ExcelUtil; |
||||
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper; |
||||
import com.jianshui.common.core.domain.AjaxResult; |
||||
import com.jianshui.invoice.domain.Invoice; |
||||
import com.jianshui.invoice.mapper.InvoiceMapper; |
||||
import lombok.extern.slf4j.Slf4j; |
||||
import org.springframework.web.bind.annotation.GetMapping; |
||||
import org.springframework.web.bind.annotation.RestController; |
||||
import org.springframework.web.multipart.MultipartFile; |
||||
|
||||
import javax.annotation.Resource; |
||||
import java.io.File; |
||||
import java.io.IOException; |
||||
import java.io.InputStream; |
||||
import java.util.Date; |
||||
import java.util.List; |
||||
import java.util.concurrent.ExecutorService; |
||||
import java.util.concurrent.Executors; |
||||
import java.util.concurrent.atomic.AtomicInteger; |
||||
|
||||
/** |
||||
* @author kk |
||||
* 满意数据清洗 |
||||
* @date 2023年11月24日 14:24 |
||||
*/ |
||||
@RestController |
||||
@Slf4j |
||||
public class DataViewManYiController { |
||||
|
||||
@Resource |
||||
private InvoiceMapper invoiceMapper; |
||||
|
||||
|
||||
|
||||
@GetMapping("/getDataManYi") |
||||
public AjaxResult getDataManYi(MultipartFile file) throws IOException { |
||||
ExcelReader reader = ExcelUtil.getReader(file.getInputStream()); |
||||
List<Invoice> invoiceList = reader.readAll(Invoice.class); |
||||
log.info("总条数:{}", invoiceList.size()); |
||||
log.info("下载开始={}", new Date()); |
||||
|
||||
// 计算线程数
|
||||
int threadCount = calculateThreadCount(); |
||||
log.info("线程数:{}", threadCount); |
||||
|
||||
// 拆分任务列表
|
||||
List<List<Invoice>> invoiceListGroup = ListUtil.splitAvg(invoiceList, threadCount); |
||||
|
||||
// 创建线程池
|
||||
ExecutorService executor = Executors.newFixedThreadPool(threadCount); |
||||
// 计数器
|
||||
AtomicInteger downloadCount = new AtomicInteger(0); |
||||
// 提交任务
|
||||
AtomicInteger taskCount = new AtomicInteger(0); |
||||
for (List<Invoice> invoices : invoiceListGroup) { |
||||
executor.execute(() -> { |
||||
for (Invoice invoice : invoices) { |
||||
try { |
||||
downloadInvoice(invoice); |
||||
downloadCount.incrementAndGet(); |
||||
} catch (IOException e) { |
||||
log.error("下载发票失败,记录id,invoiceid={},fphm={}", invoice.getId(), invoice.getFphm(), e); |
||||
} |
||||
taskCount.incrementAndGet(); |
||||
} |
||||
}); |
||||
} |
||||
|
||||
// 等待任务完成
|
||||
while (taskCount.get() < invoiceList.size()) { |
||||
try { |
||||
Thread.sleep(100); |
||||
} catch (InterruptedException e) { |
||||
log.error("任务等待被中断", e); |
||||
break; |
||||
} |
||||
} |
||||
|
||||
// 关闭线程池
|
||||
executor.shutdown(); |
||||
|
||||
log.info("总共下载了{}个文件", downloadCount.get()); |
||||
return AjaxResult.success("文件下载完成"); |
||||
} |
||||
|
||||
private void downloadInvoice(Invoice invoice) throws IOException { |
||||
Date kprq = invoice.getKprq(); |
||||
String fphm = invoice.getFphm(); |
||||
String fpdm = invoice.getFpdm(); |
||||
String fileUrl = invoice.getcOfdUrl(); |
||||
|
||||
// 检查是否缺少必要信息
|
||||
if (ObjectUtil.isNull(kprq) || ObjectUtil.isNull(fphm) || ObjectUtil.isNull(fpdm) || ObjectUtil.isNull(fileUrl)) { |
||||
log.warn("发票信息不完整,记录id,invoiceid={},fphm={}", invoice.getId(), invoice.getFphm()); |
||||
return; |
||||
} |
||||
|
||||
// 构造文件路径
|
||||
String kprqStr = DateUtil.format(kprq, "yyyy-MM-dd"); |
||||
String[] kprqArray = kprqStr.split("-"); |
||||
String savePath = "C://downloadMY/" + kprqArray[0] + "/" + kprqArray[1] + "/" + kprqArray[2] + "/" + fphm.substring(fphm.length() - 3, fphm.length()) + "/" + fpdm + "_" + fphm + ".ofd"; |
||||
File fileRes = new File(savePath); |
||||
|
||||
// 检查文件是否已存在
|
||||
if (fileRes.exists()) { |
||||
log.debug("文件已存在,跳过下载:{}", fileRes); |
||||
return; |
||||
} |
||||
|
||||
// 下载文件
|
||||
HttpResponse response = HttpRequest.get(fileUrl).execute(); |
||||
if (response.isOk()) { |
||||
InputStream inputStream = response.bodyStream(); |
||||
FileUtil.mkdir(fileRes.getParent()); |
||||
IoUtil.copy(inputStream, FileUtil.getOutputStream(fileRes)); |
||||
log.debug("下载成功:{}", fileRes); |
||||
} else { |
||||
log.warn("下载失败:{},原因:{}", fileRes, response.getStatus()); |
||||
} |
||||
} |
||||
|
||||
private int calculateThreadCount() { |
||||
int cpuCores = Runtime.getRuntime().availableProcessors(); |
||||
long maxMemory = Runtime.getRuntime().maxMemory(); |
||||
double memoryRatio = 0.8; |
||||
double diskIORatio = 0.7; |
||||
long availableMemory = (long) (maxMemory * memoryRatio); |
||||
int availableThreads = (int) (cpuCores * diskIORatio); |
||||
int maxThreads = (int) (availableMemory / (1024 * 1024 * 10)); // 每个线程分配10MB内存
|
||||
int maxThreadsPerDiskIO = 20; // 每个磁盘IO操作最多分配20个线程
|
||||
return Math.min(maxThreads, maxThreadsPerDiskIO); |
||||
} |
||||
|
||||
|
||||
|
||||
|
||||
} |
Loading…
Reference in new issue