package com.ewaytek.deepseek.task; import cn.hutool.http.ContentType; import cn.hutool.http.Header; import cn.hutool.http.HttpRequest; import cn.hutool.http.HttpResponse; import com.alibaba.fastjson2.JSON; import com.ewaytek.deepseek.config.DifyConfig; import com.ewaytek.deepseek.doadmin.dto.dify.DIfyImportVerifyDTO; import com.ewaytek.deepseek.doadmin.dto.dify.DifyChatBlockIngDTO; import com.ewaytek.deepseek.doadmin.vo.dify.BlockingVO; import com.ewaytek.deepseek.doadmin.vo.dify.DifyStreamVO; import com.ewaytek.deepseek.doadmin.vo.dify.MetadataVO; import com.ewaytek.deepseek.doadmin.vo.dify.RetrieverResources; import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.util.concurrent.RateLimiter; import lombok.extern.slf4j.Slf4j; import okhttp3.*; import org.apache.commons.collections4.CollectionUtils; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.text.DecimalFormat; import java.util.ArrayList; import java.util.Collections; import java.util.Date; import java.util.List; import java.util.function.Consumer; /** * @author yangtq * @date 2025/2/28 */ @Slf4j public class DifyThread implements Runnable{ private DIfyImportVerifyDTO data; private DifyConfig difyConfig; private static final RateLimiter rateLimiter = RateLimiter.create(5.0); // 每秒最多 5 次调用 private OkHttpClient httpClient; // 静态共享集合,用于存储所有任务的结果 private static final List results = Collections.synchronizedList(new ArrayList<>()); private final Consumer callback; public DifyThread(DIfyImportVerifyDTO data, DifyConfig difyConfig, OkHttpClient httpClient, Consumer callback){ this.data = data; this.difyConfig = difyConfig; this.httpClient=httpClient; this.callback = callback; } @Override public void run() { try { rateLimiter.acquire(); // 获取令牌,阻塞等待直到获取成功 // 调用大模型处理数据 DIfyImportVerifyDTO result = processBatch(data); if(callback!=null){ callback.accept(result); // 调用回调 } // 处理结果 System.out.println("Processed result: " + result.getAnswer()); } catch (Exception e) { System.err.println("Error processing data: " + e.getMessage()); } } private DIfyImportVerifyDTO processBatch(DIfyImportVerifyDTO dto) throws Exception { DifyChatBlockIngDTO difyChatDTO = new DifyChatBlockIngDTO(); difyChatDTO.setQuery(dto.getQuestion()); difyChatDTO.setUser("ewaytek"+System.currentTimeMillis()); difyChatDTO.setResponseMode("streaming"); //构建请求参数json数据 ObjectMapper mapper = new ObjectMapper(); String requestBody = mapper.writeValueAsString(difyChatDTO); Headers headers = new Headers.Builder().add("Authorization", "Bearer " + difyConfig.getApiKey()).add("Content-Type", "application/json").build(); Request request = new Request.Builder().url(difyConfig.getApiHost() + "chat-messages").post(RequestBody.create(MediaType.parse(ContentType.JSON.getValue()), requestBody)) .headers(headers).build(); List retrieverResourcesList=new ArrayList<>(); try (Response response = httpClient.newCall(request).execute()) { if (!response.isSuccessful()) { return dto; } // 处理流式响应 ResponseBody responseBody = response.body(); if (responseBody != null) { StringBuilder responseBuilder = new StringBuilder(); while (!responseBody.source().exhausted()) { String line = responseBody.source().readUtf8Line(); if (line != null && !line.isEmpty()) { if (line.startsWith("data:")) { // 处理 SSE 格式 String eventData = line.substring(5).trim(); // 去掉 "data:" 前缀 DifyStreamVO blockingVO = JSON.parseObject(eventData, DifyStreamVO.class); MetadataVO metadataVO= blockingVO.getMetadata(); if(metadataVO!=null){ if (!CollectionUtils.isEmpty(metadataVO.getRetrieverResources())) { retrieverResourcesList.addAll(metadataVO.getRetrieverResources()); } } responseBuilder.append(blockingVO.getAnswer()); } } } dto.setRetrieverResources(retrieverResourcesList); dto.setAnswer(responseBuilder.toString()); } } catch (IOException e) { log.error(e.getMessage()); } return dto; } }