Commit 0487b6d5 authored by 何处是我家's avatar 何处是我家
Browse files

提交

parents
package com.ewaytek.deepseek.service.dify;
/**
* @author yangtq
* @date 2025/2/25
*/
@FunctionalInterface
public interface KeyStrategyFunction<T, R>{
/**
* Applies this function to the given argument.
*
* @param t the function argument
* @return the function result
*/
R apply(T t);
}
package com.ewaytek.deepseek.service.dify.impl;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.core.util.StrUtil;
import cn.hutool.http.ContentType;
import cn.hutool.http.Header;
import com.alibaba.fastjson2.JSON;
import com.ewaytek.deepseek.common.exception.base.BaseException;
import com.ewaytek.deepseek.doadmin.dto.dify.DifyChatBlockIngDTO;
import com.ewaytek.deepseek.doadmin.dto.dify.DifyResponseDTO;
import com.ewaytek.deepseek.doadmin.dto.doubao.DoubaoChatDTO;
import com.ewaytek.deepseek.doadmin.vo.dify.DifyBlockingVO;
import com.ewaytek.deepseek.doadmin.vo.dify.DifyChatVO;
import com.ewaytek.deepseek.service.dify.DifyApi;
import com.ewaytek.deepseek.service.dify.KeyStrategyFunction;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.sse.EventSource;
import okhttp3.sse.EventSourceListener;
import okhttp3.sse.EventSources;
import org.jetbrains.annotations.NotNull;
import retrofit2.Retrofit;
import retrofit2.adapter.rxjava2.RxJava2CallAdapterFactory;
import retrofit2.converter.jackson.JacksonConverterFactory;
import retrofit2.http.Body;
import java.util.*;
import java.util.concurrent.TimeUnit;
/**
* @author yangtq
* @date 2025/2/25
*/
@Slf4j
public class DifyApiClient {
//自定义api host使用builder的方式构造client
@Getter
private String apiHost;
@Getter
private String apiKey;
@Getter
private DifyApi difyApi;
// 自定义okHttpClient,非自定义为sdk默认OkHttpClient实例
@Getter
private OkHttpClient okHttpClient;
// api key的获取策略
@Getter
private KeyStrategyFunction<List<String>, String> keyStrategy;
/**
* 构造器
*
* @return OpenAiClient.Builder
*/
public static Builder builder() {
return new Builder();
}
/**
* 构造
*
* @param builder
*/
private DifyApiClient(Builder builder) {
if (StrUtil.isBlank(builder.apiHost)) {
builder.apiHost = apiHost;
}
apiHost = builder.apiHost;
apiKey = builder.apiKey;
if (Objects.isNull(builder.okHttpClient)) {
builder.okHttpClient = this.okHttpClient();
} else {
//自定义的okhttpClient 需要增加api keys
builder.okHttpClient = builder.okHttpClient
.newBuilder()
.build();
}
okHttpClient = builder.okHttpClient;
this.difyApi = new Retrofit.Builder()
.baseUrl(apiHost)
.client(okHttpClient)
.addCallAdapterFactory(RxJava2CallAdapterFactory.create())
.addConverterFactory(JacksonConverterFactory.create())
.build().create(DifyApi.class);
}
/**
* 创建默认OkHttpClient
*
* @return
*/
private OkHttpClient okHttpClient() {
return new OkHttpClient
.Builder()
.connectTimeout(30, TimeUnit.SECONDS)
.writeTimeout(30, TimeUnit.SECONDS)
.readTimeout(30, TimeUnit.SECONDS).build();
}
public static final class Builder {
//api keys
private @NotNull String apiKey;
//api请求地址,结尾处有斜杠
private String apiHost;
//自定义OkhttpClient
private OkHttpClient okHttpClient;
// api key的获取策略
private KeyStrategyFunction keyStrategy;
public Builder() {
}
/**
* @param val api请求地址,结尾处有斜杠
* @return Builder对象
*/
public Builder apiHost(String val) {
apiHost = val;
return this;
}
public Builder apiKey(@NotNull String val) {
apiKey = val;
return this;
}
public Builder keyStrategy(KeyStrategyFunction val) {
keyStrategy = val;
return this;
}
public Builder okHttpClient(OkHttpClient val) {
okHttpClient = val;
return this;
}
public DifyApiClient build() {
return new DifyApiClient(this);
}
}
/**
* 流式输出
*
* @param difyRequest
* @param eventSourceListener
* @param <T>
*/
public <T extends DifyResponseDTO> void streamChatCompletion(DifyChatVO difyRequest, EventSourceListener eventSourceListener) {
if (Objects.isNull(eventSourceListener)) {
log.info("EventSourceListener为空");
throw new RuntimeException("EventSourceListener为空");
}
try {
if (CollectionUtil.isNotEmpty(difyRequest.getInputs())) {
difyRequest.setInputs(new HashMap<>());
}
if (CollectionUtil.isNotEmpty(difyRequest.getFiles())) {
difyRequest.setFiles(new ArrayList<>());
}
//构建请求参数json数据
ObjectMapper mapper = new ObjectMapper();
String requestBody = mapper.writeValueAsString(difyRequest);
log.info("请求参数:{}", requestBody);
//创建事件工厂
EventSource.Factory factory = EventSources.createFactory(this.okHttpClient);
Request request = new Request.Builder()
.url(this.apiHost + "chat-messages")
.addHeader(Header.AUTHORIZATION.getValue(), "Bearer " + apiKey)
.addHeader(Header.ACCEPT.getValue(),ContentType.EVENT_STREAM.getValue())
.post(RequestBody.create(MediaType.parse(ContentType.JSON.getValue()), requestBody))
.build();
factory.newEventSource(request, eventSourceListener);
} catch (Exception e) {
log.error("请求参数解析异常:{}", e.getMessage());
}
}
/**
* 流式输出
*
* @param doubaoChatDTO
* @param eventSourceListener
* @param <T>
*/
public <T extends DifyResponseDTO> void streamChatCompletionDoubao(DoubaoChatDTO doubaoChatDTO, EventSourceListener eventSourceListener) {
if (Objects.isNull(eventSourceListener)) {
log.info("EventSourceListener为空");
throw new RuntimeException("EventSourceListener为空");
}
try {
DifyChatBlockIngDTO difyChatDTO = new DifyChatBlockIngDTO();
difyChatDTO.setQuery(doubaoChatDTO.getModel());
difyChatDTO.setUser("ewaytek" + System.currentTimeMillis());
difyChatDTO.setResponseMode("streaming");
//构建请求参数json数据
ObjectMapper mapper = new ObjectMapper();
String requestBody = mapper.writeValueAsString(difyChatDTO);
log.info("请求参数:{}", requestBody);
//创建事件工厂
EventSource.Factory factory = EventSources.createFactory(this.okHttpClient);
Request request = new Request.Builder()
.url(this.apiHost + "chat-messages")
.addHeader(Header.AUTHORIZATION.getValue(), "Bearer " + "app-Lrj9iXzzJN8dXG3DvldKVVCB")
.addHeader(Header.ACCEPT.getValue(),ContentType.EVENT_STREAM.getValue())
.post(RequestBody.create(MediaType.parse(ContentType.JSON.getValue()), requestBody))
.build();
factory.newEventSource(request, eventSourceListener);
} catch (Exception e) {
log.error("请求参数解析异常:{}", e.getMessage());
}
}
/**
* 阻塞式问答
*
* @param difyRequest chat completion
* @return 返回答案
*/
public DifyBlockingVO chatMessages(@Body DifyChatVO difyRequest, String serverKey) {
if (difyRequest.getInputs() == null) {
difyRequest.setInputs(new HashMap<>());
}
if (difyRequest.getFiles() == null) {
difyRequest.setFiles(new ArrayList<>());
}
log.debug(JSON.toJSONString(difyRequest));
// 序列化请求体
ObjectMapper mapper = new ObjectMapper();
String requestBodyJson = "";
try {
requestBodyJson = mapper.writeValueAsString(difyRequest);
} catch (Exception e) {
log.error("请求体序列化失败:{}", e.getMessage());
throw new RuntimeException(e.getMessage());
}
// 创建请求体
RequestBody requestBody = RequestBody.create(MediaType.parse(ContentType.JSON.getValue()), requestBodyJson);
// 创建请求对象,这里动态地将API Key设置到请求头中
Request request = new Request.Builder()
.url(this.apiHost + "chat-messages") // 此处路径根据实际需要进行调整
.addHeader("Authorization", "Bearer " + serverKey) // 设置动态API Key
.post(requestBody)
.build();
DifyBlockingVO response;
try {
// 执行同步请求并获取响应
okhttp3.Response okHttpResponse = okHttpClient.newCall(request).execute();
if (!okHttpResponse.isSuccessful() || okHttpResponse.body() == null) {
log.error("请求失败:HTTP {},message: {}", okHttpResponse.code(), okHttpResponse.message());
throw new BaseException("请求失败:HTTP " + okHttpResponse.code() + " " + okHttpResponse.message());
}
// 反序列化响应体
String responseBody = okHttpResponse.body().string();
response = mapper.readValue(responseBody, DifyBlockingVO.class);
} catch (Exception e) {
log.error("请求异常:{}", e.getMessage());
throw new RuntimeException(e.getMessage());
}
// 返回结果
return response;
}
}
package com.ewaytek.deepseek.service.dify.impl;
import com.alibaba.fastjson2.JSON;
import com.alibaba.nls.client.protocol.tts.SpeechSynthesizerListener;
import com.ewaytek.deepseek.common.config.DeepseekConfig;
import com.ewaytek.deepseek.common.utils.StringUtils;
import com.ewaytek.deepseek.config.DifyConfig;
import com.ewaytek.deepseek.doadmin.dto.dify.DifyChatBlockIngDTO;
import com.ewaytek.deepseek.doadmin.vo.DifyAudiVO;
import com.ewaytek.deepseek.doadmin.vo.dify.DifyBlockingVO;
import com.ewaytek.deepseek.doadmin.vo.dify.DifyStreamVO;
import com.ewaytek.deepseek.doadmin.vo.dify.MetadataVO;
import com.ewaytek.deepseek.service.dify.DifyAudioChatService;
import com.ewaytek.deepseek.service.dify.tts.SpeechRecognizerService;
import com.ewaytek.deepseek.service.dify.tts.SpeechSynthesizer;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import okhttp3.*;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter;
import java.io.*;
import java.lang.reflect.Field;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* @author yangtq
* @date 2025/3/28
*/
@Service
@Slf4j
@RequiredArgsConstructor
public class DifyAudioChatServiceImpl implements DifyAudioChatService {
@Autowired
private DifyConfig difyConfig;
@Autowired
private OkHttpClient httpClient;
@Autowired
private SpeechSynthesizer synthesizer;
@Autowired
private SpeechRecognizerService speechRecognizerService;
private DifyChatBlockIngDTO getBifyEntity(InputStream audioStream, String context) throws Exception {
DifyChatBlockIngDTO difyChatBlockIngDTO = new DifyChatBlockIngDTO();
difyChatBlockIngDTO.setUser("ewaytek" + System.currentTimeMillis());
difyChatBlockIngDTO.setResponseMode("streaming");
if (context != null && !context.isEmpty()) {
difyChatBlockIngDTO.setQuery(context);
} else {
difyChatBlockIngDTO.setQuery(audioDispose(audioStream));
}
return difyChatBlockIngDTO;
}
@Override
public void systemChatAudio(InputStream audioStream, String text, OutputStream emitter) throws Exception {
// saveAudioFile(audioStream, "uploaded_audio.wav");
DifyChatBlockIngDTO difyChatBlockIngDTO =getBifyEntity(audioStream,text);
ObjectMapper mapper = new ObjectMapper();
String requestBody = mapper.writeValueAsString(difyChatBlockIngDTO);
Headers headers = new Headers.Builder()
.add("Authorization", "Bearer " + difyConfig.getApiKeyTts())
.add("Content-Type", "application/json")
.build();
Request request = new Request.Builder()
.url(difyConfig.getApiHost() + "chat-messages")
.post(RequestBody.create(MediaType.parse("application/json"), requestBody))
.headers(headers)
.build();
try (Response response = httpClient.newCall(request).execute()) {
if (!response.isSuccessful()) {
throw new RuntimeException("请求失败:HTTP " + response.code() + " " + response.message());
}
String responseBody = response.body().string();
systemDispose(responseBody,emitter);
} catch (Exception e) {
throw new RuntimeException("请求处理失败", e);
}
}
@Override
public void systemChatAudioFlie(InputStream audioStream, String context, ResponseBodyEmitter emitter) throws Exception {
DifyChatBlockIngDTO difyChatBlockIngDTO =getBifyEntity(audioStream,context);
ObjectMapper mapper = new ObjectMapper();
String requestBody = mapper.writeValueAsString(difyChatBlockIngDTO);
Headers headers = new Headers.Builder()
.add("Authorization", "Bearer " + difyConfig.getApiKeyTts())
.add("Content-Type", "application/json")
.build();
Request request = new Request.Builder()
.url(difyConfig.getApiHost() + "chat-messages")
.post(RequestBody.create(MediaType.parse("application/json"), requestBody))
.headers(headers)
.build();
try (Response response = httpClient.newCall(request).execute()) {
if (!response.isSuccessful()) {
throw new RuntimeException("请求失败:HTTP " + response.code() + " " + response.message());
}
// 处理流式响应
ResponseBody responseBody = response.body();
if (responseBody != null) {
StringBuilder responseBuilder = new StringBuilder();
while (!responseBody.source().exhausted()) {
String line = responseBody.source().readUtf8Line();
if(line==null||line.isEmpty()){
continue;
}
if(!line.startsWith("data:")){
continue;
}
String eventData = line.substring(5).trim(); // 去掉 "data:" 前缀
DifyStreamVO blockingVO = JSON.parseObject(eventData, DifyStreamVO.class);
if(StringUtils.isEmpty(blockingVO.getAnswer())){
continue;
}
emitter.send("data: " + blockingVO.getAnswer()+ "\n\n");
// responseBuilder.append(blockingVO.getAnswer());
// Pattern pattern = Pattern.compile("[:;。,!]");
// Matcher matcher = pattern.matcher(responseBuilder.toString());
// if (matcher.find()) {
// Field boolField = null;
// Boolean sessce = false;
// SpeechSynthesizerListener speechSynthesizerListener = synthesizer.process1(responseBuilder.toString());
// while (!sessce) {
// Thread.sleep(100);
// boolField = speechSynthesizerListener.getClass().getDeclaredField("sessce");
// boolField.setAccessible(true);
// sessce = (Boolean) boolField.get(speechSynthesizerListener);
// }
// byte[] bytes = getBytes(speechSynthesizerListener);
// String name=System.currentTimeMillis()+".wav";
// String path=DeepseekConfig.getDownloadPath()+name;
// saveAudioToFile(bytes, path);
// DifyAudiVO difyAudiVO=new DifyAudiVO();
// difyAudiVO.setName(name);
// difyAudiVO.setContext(responseBuilder.toString());
// emitter.send("data: " + JSON.toJSONString(difyAudiVO) + "\n\n");
// responseBuilder.setLength(0);
// }
}
}
} catch (Exception e) {
throw new RuntimeException("请求处理失败", e);
}
}
private void systemDispose(String responseBody,OutputStream emitter) throws IOException, InterruptedException, NoSuchFieldException, IllegalAccessException {
ObjectMapper mapper = new ObjectMapper();
DifyBlockingVO blockingVO = mapper.readValue(responseBody, DifyBlockingVO.class);
List<String> paragraphs = splitTextToParagraphs(blockingVO.getAnswer());
synthesizeText(paragraphs,emitter);
}
private void synthesizeText(List<String> paragraphs,OutputStream outputStream) throws InterruptedException, NoSuchFieldException, IllegalAccessException, IOException {
for (String paragraph : paragraphs) {
Field boolField = null;
Boolean sessce = false;
SpeechSynthesizerListener speechSynthesizerListener = synthesizer.process1(paragraph);
while (!sessce) {
Thread.sleep(100);
boolField = speechSynthesizerListener.getClass().getDeclaredField("sessce");
boolField.setAccessible(true);
sessce = (Boolean) boolField.get(speechSynthesizerListener);
}
byte[] bytes = getBytes(speechSynthesizerListener);
// saveAudioToFile(bytes, "/Users/yang/Downloads/output.wav");
outputStream.write(bytes);
outputStream.flush();
}
outputStream.close();
}
private void saveAudioToFile(byte[] bytes, String fileName) throws IOException {
File file = new File(fileName);
try (FileOutputStream fos = new FileOutputStream(file)) {
fos.write(bytes);
fos.flush();
}
}
private static byte[] getBytes(SpeechSynthesizerListener speechSynthesizerListener) {
Field bytesArrayField = null;
try {
bytesArrayField = speechSynthesizerListener.getClass().getDeclaredField("bytes");
bytesArrayField.setAccessible(true);
byte[] bytes = (byte[]) bytesArrayField.get(speechSynthesizerListener);
return bytes;
} catch (NoSuchFieldException | IllegalAccessException e) {
throw new RuntimeException(e);
}
}
private String audioDispose(InputStream input) throws Exception {
ByteArrayOutputStream output = new ByteArrayOutputStream();
byte[] buffer = new byte[1024 * 4];
int n;
while ((n = input.read(buffer)) != -1) {
output.write(buffer, 0, n);
}
return speechRecognizerService.process(output.toByteArray());
}
private static List<String> splitTextToParagraphs(String text) {
String[] splitResult = text.split("([,.?!])");
List<String> paragraphs = new ArrayList<>();
for (int i = 0; i < splitResult.length; i++) {
String content = splitResult[i].trim();
if (!content.isEmpty()) {
if (i + 1 < splitResult.length && splitResult[i + 1].matches("[,.?!]")) {
paragraphs.add(content + splitResult[i + 1]);
i++;
} else {
paragraphs.add(content);
}
}
}
return paragraphs;
}
}
package com.ewaytek.deepseek.service.dify.impl;
import cn.hutool.cache.impl.TimedCache;
import cn.hutool.core.date.DateUnit;
import cn.hutool.http.ContentType;
import cn.hutool.http.Header;
import cn.hutool.http.HttpRequest;
import cn.hutool.http.HttpResponse;
import com.alibaba.fastjson2.JSON;
import com.ewaytek.deepseek.common.bean.base.ApiResponse;
import com.ewaytek.deepseek.common.utils.poi.ExcelUtil;
import com.ewaytek.deepseek.config.DifyConfig;
import com.ewaytek.deepseek.config.DifySseEventSourceListener;
import com.ewaytek.deepseek.doadmin.dto.dify.DIfyImportVerifyDTO;
import com.ewaytek.deepseek.doadmin.dto.dify.DifyChatBlockIngDTO;
import com.ewaytek.deepseek.doadmin.dto.dify.DifyChatDTO;
import com.ewaytek.deepseek.doadmin.vo.dify.*;
import com.ewaytek.deepseek.service.dify.DifyChatService;
import com.ewaytek.deepseek.task.DifyThread;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.RateLimiter;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import okhttp3.*;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import org.springframework.web.multipart.MultipartFile;
import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter;
import reactor.core.publisher.Flux;
import javax.annotation.Resource;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.*;
import java.util.stream.Collectors;
/**
* @author yangtq
* @date 2025/2/25
*/
@Service
@Slf4j
@RequiredArgsConstructor
public class DifyChatServiceImpl implements DifyChatService {
private final RateLimiter rateLimiter = RateLimiter.create(5.0);
/**
* 设置sse链接时长缓存
*/
public static final long TIMEOUT = 30 * DateUnit.MINUTE.getMillis();
public static final TimedCache<String, Object> LOCAL_CACHE = new TimedCache<>(TIMEOUT);
@Resource
private DifyApiClient difyApiClient;
@Resource
private DifyConfig difyConfig;
@Resource
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
@Autowired
private OkHttpClient httpClient;
@Override
public ResponseBodyEmitter sseChatPrompt(DifyChatVO difyChatVO) {
ResponseBodyEmitter sseEmitter = this.getResponseBodyEmitter(difyChatVO);
DifySseEventSourceListener listener = new DifySseEventSourceListener(sseEmitter);
difyApiClient.streamChatCompletion(difyChatVO, listener);
return sseEmitter;
}
@Override
public ApiResponse<List<ConversationsVO.ConversationInfoVO>> conversations(Integer limit, Boolean pinned) {
String url = String.format("%s?limit=%d&pinned=%s",
difyConfig.getConversationUrl(),
limit,
pinned);
HttpRequest request = HttpRequest.get(url)
.header(Header.AUTHORIZATION, "Bearer " + difyConfig.getApiKey())
.header(Header.CONTENT_TYPE, "application/json")
.timeout(60000);
try (HttpResponse response = request.execute()) {
if (response.isOk()) {
String responseBody = response.body();
log.info("Response: {}", responseBody);
ConversationsVO conversationsVO = JSON.parseObject(responseBody, ConversationsVO.class);
return ApiResponse.success(conversationsVO.getData());
} else {
log.error("Request failed with status code: {}", response.getStatus());
return ApiResponse.fail("Request failed with status code: " + response.getStatus());
}
} catch (Exception e) {
log.error("Error during request: ", e);
return ApiResponse.fail("Error during request: " + e.getMessage());
}
}
@Override
public void importVerify2(HttpServletRequest request, HttpServletResponse response, MultipartFile file) throws IOException, InterruptedException {
ExcelUtil<DIfyImportVerifyDTO> util = new ExcelUtil<DIfyImportVerifyDTO>(DIfyImportVerifyDTO.class);
List<DIfyImportVerifyDTO> list = util.importExcel(file.getInputStream());
if (!CollectionUtils.isEmpty(list)) {
// 动态设置线程池大小
int corePoolSize = Runtime.getRuntime().availableProcessors();
int maximumPoolSize = corePoolSize * 2;
long keepAliveTime = 60L;
TimeUnit unit = TimeUnit.SECONDS;
BlockingQueue<Runnable> workQueue = new LinkedBlockingQueue<>(100);
ExecutorService pool = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
workQueue
);
int batchSize = 10; // 每批次提交的任务数
int totalTasks = list.size();
List<DIfyImportVerifyDTO> results = new ArrayList<>();
for (int i = 0; i < totalTasks; i += batchSize) {
int end = Math.min(i + batchSize, totalTasks);
List<DIfyImportVerifyDTO> batch = list.subList(i, end);
List<Callable<DIfyImportVerifyDTO>> tasks = batch.stream()
.map(dto -> (Callable<DIfyImportVerifyDTO>) () -> processBatch(dto))
.collect(Collectors.toList());
List<Future<DIfyImportVerifyDTO>> futures = pool.invokeAll(tasks);
for (Future<DIfyImportVerifyDTO> future : futures) {
try {
results.add(future.get());
} catch (InterruptedException | ExecutionException e) {
log.error("Error processing batch", e);
}
}
}
util.exportExcel(results, "dify");
// 导出结果到 Excel
util.exportExcel(response, results, "dify");
// 关闭线程池
pool.shutdown();
}
}
public DIfyImportVerifyDTO processBatch(DIfyImportVerifyDTO dto) throws JsonProcessingException {
DifyChatBlockIngDTO difyChatDTO = new DifyChatBlockIngDTO();
difyChatDTO.setQuery(dto.getQuestion());
difyChatDTO.setUser("ewaytek" + System.currentTimeMillis());
difyChatDTO.setResponseMode("streaming");
//构建请求参数json数据
ObjectMapper mapper = new ObjectMapper();
String requestBody = mapper.writeValueAsString(difyChatDTO);
Headers headers = new Headers.Builder().add("Authorization", "Bearer " + difyConfig.getApiKey()).add("Content-Type", "application/json").build();
Request request = new Request.Builder().url(difyConfig.getApiHost() + "chat-messages").post(RequestBody.create(MediaType.parse(ContentType.JSON.getValue()), requestBody))
.headers(headers).build();
List<RetrieverResources> retrieverResourcesList = new ArrayList<>();
try (Response response = httpClient.newCall(request).execute()) {
if (!response.isSuccessful()) {
return dto;
}
// 处理流式响应
ResponseBody responseBody = response.body();
if (responseBody != null) {
StringBuilder responseBuilder = new StringBuilder();
while (!responseBody.source().exhausted()) {
String line = responseBody.source().readUtf8Line();
if (line != null && !line.isEmpty()) {
if (line.startsWith("data:")) { // 处理 SSE 格式
String eventData = line.substring(5).trim(); // 去掉 "data:" 前缀
DifyStreamVO blockingVO = JSON.parseObject(eventData, DifyStreamVO.class);
MetadataVO metadataVO = blockingVO.getMetadata();
if (metadataVO != null) {
if (!CollectionUtils.isEmpty(metadataVO.getRetrieverResources())) {
retrieverResourcesList.addAll(metadataVO.getRetrieverResources());
}
}
responseBuilder.append(blockingVO.getAnswer());
}
}
}
dto.setRetrieverResources(retrieverResourcesList);
dto.setAnswer(responseBuilder.toString());
}
} catch (IOException e) {
log.error(e.getMessage());
}
return dto;
}
@Override
public ApiResponse importVerify3(MultipartFile file) throws IOException, InterruptedException, ExecutionException {
ExcelUtil<DIfyImportVerifyDTO> util = new ExcelUtil<>(DIfyImportVerifyDTO.class);
List<DIfyImportVerifyDTO> dataList = util.importExcel(file.getInputStream());
if (!CollectionUtils.isEmpty(dataList)) {
for (DIfyImportVerifyDTO ifyImportVerifyDTO : dataList) {
processBatch(ifyImportVerifyDTO);
}
return util.exportExcel(dataList, "import_verify_results");
}
return ApiResponse.fail();
}
@Override
public ApiResponse importVerify(MultipartFile file) throws IOException, InterruptedException, ExecutionException {
ExcelUtil<DIfyImportVerifyDTO> util = new ExcelUtil<>(DIfyImportVerifyDTO.class);
List<DIfyImportVerifyDTO> dataList = util.importExcel(file.getInputStream());
if (!CollectionUtils.isEmpty(dataList)) {
// 提交任务到线程池
List<CompletableFuture<DIfyImportVerifyDTO>> futures = dataList.stream()
.map(data -> CompletableFuture.supplyAsync(() -> {
DifyThread thread = new DifyThread(data, difyConfig, httpClient, null);
thread.run(); // 执行任务
return data; // 返回处理后的数据
}, threadPoolTaskExecutor))
.collect(Collectors.toList());
// 等待所有任务完成
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0]));
allFutures.get(); // 阻塞等待所有任务完成
// 收集所有结果
List<DIfyImportVerifyDTO> results = futures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList());
// 导出结果到 Excel 文件
return util.exportExcel(results, "import_verify_results");
}
return ApiResponse.fail();
}
/**
* 创建sse连接
*
* @param chatRequest
* @return
*/
private ResponseBodyEmitter getResponseBodyEmitter(DifyChatVO chatRequest) {
//0L设置允许超时
ResponseBodyEmitter sseEmitter = new ResponseBodyEmitter(0L);
sseEmitter.onCompletion(() -> {
log.info("会话[{}]sse结束连接......", chatRequest.getConversationId());
LOCAL_CACHE.remove(chatRequest.getConversationId());
});
//超时回调
sseEmitter.onTimeout(() -> {
log.error("会话[{}]sse连接超时......", chatRequest.getConversationId());
});
//异常回调
sseEmitter.onError(
throwable -> {
log.error("会话[{}]sse连接失败......", chatRequest.getConversationId());
}
);
LOCAL_CACHE.put(chatRequest.getConversationId(), sseEmitter);
log.info("会话[{}]创建sse连接成功!", chatRequest.getConversationId());
return sseEmitter;
}
}
package com.ewaytek.deepseek.service.dify.impl;
import cn.hutool.core.util.RandomUtil;
import com.ewaytek.deepseek.service.dify.KeyStrategyFunction;
import java.util.List;
/**
* @author yangtq
* @date 2025/2/25
*/
public class KeyRandomStrategy implements KeyStrategyFunction<List<String>, String> {
@Override
public String apply(List<String> apiKeys) {
return RandomUtil.randomEle(apiKeys);
}
}
package com.ewaytek.deepseek.service.dify.tts;
import com.alibaba.nls.client.protocol.InputFormatEnum;
import com.alibaba.nls.client.protocol.NlsClient;
import com.alibaba.nls.client.protocol.SampleRateEnum;
import com.alibaba.nls.client.protocol.asr.SpeechRecognizer;
import com.alibaba.nls.client.protocol.asr.SpeechRecognizerListener;
import com.alibaba.nls.client.protocol.asr.SpeechRecognizerResponse;
import com.ewaytek.deepseek.config.AliTtsConfig;
import lombok.Data;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.util.concurrent.CountDownLatch;
/**
* @author yangtq
* @date 2025/3/28
*/
@Data
@Component
public class SpeechRecognizerService {
private static final Logger logger = LoggerFactory.getLogger(SpeechRecognizerService.class);
@Autowired
private AliTtsConfig aliTtsConfig;
public interface RecognitionCallback {
void onResult(String result);
void onError(String errorMessage);
}
private static SpeechRecognizerListener getRecognizerListener(RecognitionCallback callback) {
return new SpeechRecognizerListener() {
@Override
public void onRecognitionResultChanged(SpeechRecognizerResponse response) {
if (callback != null) {
callback.onResult(response.getRecognizedText());
}
}
@Override
public void onRecognitionCompleted(SpeechRecognizerResponse response) {
if (callback != null) {
callback.onResult(response.getRecognizedText());
}
}
@Override
public void onStarted(SpeechRecognizerResponse speechRecognizerResponse) {}
@Override
public void onFail(SpeechRecognizerResponse response) {
if (callback != null) {
callback.onError(response.getStatusText());
}
}
};
}
public String process(byte[] audioData) {
final String[] result = {null};
final CountDownLatch latch = new CountDownLatch(1);
SpeechRecognizer recognizer = null;
try {
SpeechRecognizerListener listener = getRecognizerListener(new RecognitionCallback() {
@Override
public void onResult(String text) {
result[0] = text;
latch.countDown();
}
@Override
public void onError(String errorMessage) {
logger.error("Recognition error: {}", errorMessage);
latch.countDown();
}
});
recognizer = new SpeechRecognizer(AliTtsConfig.getNlsClient(), listener);
recognizer.setAppKey(aliTtsConfig.getAppKey());
recognizer.setFormat(InputFormatEnum.PCM);
recognizer.setSampleRate(SampleRateEnum.SAMPLE_RATE_8K);
recognizer.setEnableIntermediateResult(true);
recognizer.addCustomedParam("enable_voice_detection", true);
long now = System.currentTimeMillis();
recognizer.start();
logger.info("ASR start latency : " + (System.currentTimeMillis() - now) + " ms");
byte[] chunk = new byte[3200];
for (int i = 0; i < audioData.length; i += chunk.length) {
int length = Math.min(chunk.length, audioData.length - i);
System.arraycopy(audioData, i, chunk, 0, length);
recognizer.send(chunk, length);
Thread.sleep(10); // 模拟实时发送
}
now = System.currentTimeMillis();
recognizer.stop();
logger.info("ASR stop latency : " + (System.currentTimeMillis() - now) + " ms");
latch.await();
} catch (Exception e) {
logger.error("Error during speech recognition", e);
} finally {
if (recognizer != null) {
recognizer.close();
}
}
return result[0];
}
}
package com.ewaytek.deepseek.service.dify.tts;
import com.alibaba.nls.client.protocol.OutputFormatEnum;
import com.alibaba.nls.client.protocol.SampleRateEnum;
import com.alibaba.nls.client.protocol.tts.SpeechSynthesizerListener;
import com.alibaba.nls.client.protocol.tts.SpeechSynthesizerResponse;
import com.ewaytek.deepseek.config.AliTtsConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.IOException;
import java.io.OutputStream;
import java.lang.reflect.Field;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.concurrent.CountDownLatch;
/**
* @author yangtq
* @date 2025/3/28
*/
@Component
public class SpeechSynthesizer {
private static final Logger logger = LoggerFactory.getLogger(SpeechSynthesizer.class);
@Resource
AliTtsConfig aliTtsConfig;
private static SpeechSynthesizerListener getSynthesizerListener(Date startTime, OutputStream outputStream) {
return new SpeechSynthesizerListener() {
private boolean firstRecvBinary = true;
private List<byte[]> bytesList = new ArrayList<>();
private byte[] bytes = null;
private Boolean sessce=false;
@Override
public void onComplete(SpeechSynthesizerResponse response) {
bytes = this.byteMerger(bytesList);
sessce=true;
logger.info("name: " + response.getName() + ", status: " + response.getStatus() + ", task_id: " + response.getTaskId());
}
@Override
public void onMessage(ByteBuffer message) {
if (firstRecvBinary) {
firstRecvBinary = false;
}
byte[] bytesArray = new byte[message.remaining()];
message.get(bytesArray, 0, bytesArray.length);
bytesList.add(bytesArray);
}
@Override
public void onFail(SpeechSynthesizerResponse response) {
logger.info("task_id: " + response.getTaskId() + ", status: " + response.getStatus() + ", status_text: " + response.getStatusText());
}
@Override
public void onMetaInfo(SpeechSynthesizerResponse response) {
logger.info("MetaInfo event:{}", response.getTaskId());
}
private byte[] byteMerger(List<byte[]> byteList) {
int lengthByte = 0;
for (int i = 0; i < byteList.size(); i++) {
lengthByte += byteList.get(i).length;
}
byte[] allByte = new byte[lengthByte];
int countLength = 0;
for (int i = 0; i < byteList.size(); i++) {
byte[] b = byteList.get(i);
System.arraycopy(b, 0, allByte, countLength, b.length);
countLength += b.length;
}
return allByte;
}
};
}
public void process(String word, OutputStream outputStream, Boolean sessce) {
word = word.replaceAll("<.*?>", "");
logger.info("word:" + word);
com.alibaba.nls.client.protocol.tts.SpeechSynthesizer synthesizer = null;
try {
SpeechSynthesizerListener speechSynthesizerListener = getSynthesizerListener(new Date(), outputStream);
synthesizer = new com.alibaba.nls.client.protocol.tts.SpeechSynthesizer(AliTtsConfig.getNlsClient(), speechSynthesizerListener);
synthesizer.setAppKey(aliTtsConfig.getAppKey());
synthesizer.setFormat(OutputFormatEnum.WAV);
synthesizer.setSampleRate(SampleRateEnum.SAMPLE_RATE_8K);
synthesizer.setVoice(aliTtsConfig.getVoice());
synthesizer.setVolume(aliTtsConfig.getVolume());
synthesizer.setPitchRate(aliTtsConfig.getPitch_rate());
synthesizer.setSpeechRate(aliTtsConfig.getPitch_rate());
synthesizer.setText(word);
synthesizer.addCustomedParam("enable_subtitle", true);
long start = System.currentTimeMillis();
synthesizer.start();
logger.info("tts start latency " + word + " " + (System.currentTimeMillis() - start) + " ms");
synthesizer.waitForComplete(60000);
logger.info("tts stop latency " + word + " " + (System.currentTimeMillis() - start) + " ms");
} catch (Exception e) {
e.printStackTrace();
} finally {
if (synthesizer != null) {
synthesizer.close();
}
}
}
public SpeechSynthesizerListener process1(String word) {
word = word.replaceAll("<.*?>", "");
logger.info("word:" + word);
com.alibaba.nls.client.protocol.tts.SpeechSynthesizer synthesizer = null;
try {
SpeechSynthesizerListener speechSynthesizerListener = getSynthesizerListener(new Date(), null);
synthesizer = new com.alibaba.nls.client.protocol.tts.SpeechSynthesizer(AliTtsConfig.getNlsClient(), speechSynthesizerListener);
synthesizer.setAppKey(aliTtsConfig.getAppKey());
synthesizer.setFormat(OutputFormatEnum.WAV);
synthesizer.setSampleRate(SampleRateEnum.SAMPLE_RATE_8K);
synthesizer.setVoice(aliTtsConfig.getVoice());
synthesizer.setVolume(aliTtsConfig.getVolume());
synthesizer.setPitchRate(aliTtsConfig.getPitch_rate());
synthesizer.setSpeechRate(aliTtsConfig.getPitch_rate());
synthesizer.setText(word);
synthesizer.addCustomedParam("enable_subtitle", true);
long start = System.currentTimeMillis();
synthesizer.start();
logger.info("tts start latency " + word + " " + (System.currentTimeMillis() - start) + " ms");
synthesizer.waitForComplete(60000);
logger.info("tts stop latency " + word + " " + (System.currentTimeMillis() - start) + " ms");
return speechSynthesizerListener;
} catch (Exception e) {
e.printStackTrace();
} finally {
if (synthesizer != null) {
synthesizer.close();
}
}
return null;
}
}
package com.ewaytek.deepseek.service.dify.tts;
import com.alibaba.nls.client.protocol.OutputFormatEnum;
import com.alibaba.nls.client.protocol.SampleRateEnum;
import com.alibaba.nls.client.protocol.tts.SpeechSynthesizerListener;
import com.alibaba.nls.client.protocol.tts.SpeechSynthesizerResponse;
import com.ewaytek.deepseek.config.AliTtsConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import javax.annotation.Resource;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
/**
* @author yangtq
* @date 2025/3/28
*/
public class SpeechSynthesizerDemo {
private static final Logger logger = LoggerFactory.getLogger(SpeechSynthesizerDemo.class);
AliTtsConfig aliTtsConfig;
public SpeechSynthesizerDemo(AliTtsConfig aliTtsConfig) {
this.aliTtsConfig = aliTtsConfig;
}
private static SpeechSynthesizerListener getSynthesizerListener(Date startTime, OutputStream outputStream) {
return new SpeechSynthesizerListener() {
private boolean firstRecvBinary = true;
private List<byte[]> bytesList = new ArrayList<>();
private byte[] bytes = null;
private Boolean sessce=false;
@Override
public void onComplete(SpeechSynthesizerResponse response) {
bytes = this.byteMerger(bytesList);
sessce=true;
logger.info("name: " + response.getName() + ", status: " + response.getStatus() + ", task_id: " + response.getTaskId());
}
@Override
public void onMessage(ByteBuffer message) {
if (firstRecvBinary) {
firstRecvBinary = false;
}
byte[] bytesArray = new byte[message.remaining()];
message.get(bytesArray, 0, bytesArray.length);
bytesList.add(bytesArray);
}
@Override
public void onFail(SpeechSynthesizerResponse response) {
logger.info("task_id: " + response.getTaskId() + ", status: " + response.getStatus() + ", status_text: " + response.getStatusText());
}
@Override
public void onMetaInfo(SpeechSynthesizerResponse response) {
logger.info("MetaInfo event:{}", response.getTaskId());
}
private byte[] byteMerger(List<byte[]> byteList) {
int lengthByte = 0;
for (int i = 0; i < byteList.size(); i++) {
lengthByte += byteList.get(i).length;
}
byte[] allByte = new byte[lengthByte];
int countLength = 0;
for (int i = 0; i < byteList.size(); i++) {
byte[] b = byteList.get(i);
System.arraycopy(b, 0, allByte, countLength, b.length);
countLength += b.length;
}
return allByte;
}
};
}
public void process(String word, OutputStream outputStream, Boolean sessce) {
word = word.replaceAll("<.*?>", "");
logger.info("word:" + word);
com.alibaba.nls.client.protocol.tts.SpeechSynthesizer synthesizer = null;
try {
SpeechSynthesizerListener speechSynthesizerListener = getSynthesizerListener(new Date(), outputStream);
synthesizer = new com.alibaba.nls.client.protocol.tts.SpeechSynthesizer(AliTtsConfig.getNlsClient(), speechSynthesizerListener);
synthesizer.setAppKey(aliTtsConfig.getAppKey());
synthesizer.setFormat(OutputFormatEnum.WAV);
synthesizer.setSampleRate(SampleRateEnum.SAMPLE_RATE_8K);
synthesizer.setVoice(aliTtsConfig.getVoice());
synthesizer.setVolume(aliTtsConfig.getVolume());
synthesizer.setPitchRate(aliTtsConfig.getPitch_rate());
synthesizer.setSpeechRate(aliTtsConfig.getPitch_rate());
synthesizer.setText(word);
synthesizer.addCustomedParam("enable_subtitle", true);
long start = System.currentTimeMillis();
synthesizer.start();
logger.info("tts start latency " + word + " " + (System.currentTimeMillis() - start) + " ms");
synthesizer.waitForComplete(60000);
logger.info("tts stop latency " + word + " " + (System.currentTimeMillis() - start) + " ms");
} catch (Exception e) {
e.printStackTrace();
} finally {
if (synthesizer != null) {
synthesizer.close();
}
}
}
public SpeechSynthesizerListener process1(String word) {
word = word.replaceAll("<.*?>", "");
logger.info("word:" + word);
com.alibaba.nls.client.protocol.tts.SpeechSynthesizer synthesizer = null;
try {
SpeechSynthesizerListener speechSynthesizerListener = getSynthesizerListener(new Date(), null);
synthesizer = new com.alibaba.nls.client.protocol.tts.SpeechSynthesizer(AliTtsConfig.getNlsClient(), speechSynthesizerListener);
synthesizer.setAppKey(aliTtsConfig.getAppKey());
synthesizer.setFormat(OutputFormatEnum.WAV);
synthesizer.setSampleRate(SampleRateEnum.SAMPLE_RATE_8K);
synthesizer.setVoice(aliTtsConfig.getVoice());
synthesizer.setVolume(aliTtsConfig.getVolume());
synthesizer.setPitchRate(aliTtsConfig.getPitch_rate());
synthesizer.setSpeechRate(aliTtsConfig.getPitch_rate());
synthesizer.setText(word);
synthesizer.addCustomedParam("enable_subtitle", true);
long start = System.currentTimeMillis();
synthesizer.start();
logger.info("tts start latency " + word + " " + (System.currentTimeMillis() - start) + " ms");
synthesizer.waitForComplete(60000);
logger.info("tts stop latency " + word + " " + (System.currentTimeMillis() - start) + " ms");
return speechSynthesizerListener;
} catch (Exception e) {
e.printStackTrace();
} finally {
if (synthesizer != null) {
synthesizer.close();
}
}
return null;
}
}
package com.ewaytek.deepseek.service.dify.workflows;
import com.ewaytek.deepseek.common.bean.base.ApiResponse;
import org.springframework.web.multipart.MultipartFile;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
/**
* @author yangtq
* @date 2025/3/6
*/
public interface DifyWorkflowsService {
public ApiResponse importVerify(MultipartFile file) throws IOException, InterruptedException, ExecutionException;
}
package com.ewaytek.deepseek.service.dify.workflows.impl;
import com.ewaytek.deepseek.common.bean.base.ApiResponse;
import com.ewaytek.deepseek.common.utils.poi.ExcelUtil;
import com.ewaytek.deepseek.config.DifyConfig;
import com.ewaytek.deepseek.doadmin.dto.dify.DIfyImportVerifyDTO;
import com.ewaytek.deepseek.doadmin.vo.dify.workflows.WorkflowsExecVO;
import com.ewaytek.deepseek.service.dify.impl.DifyApiClient;
import com.ewaytek.deepseek.service.dify.workflows.DifyWorkflowsService;
import com.ewaytek.deepseek.task.DifyThread;
import com.ewaytek.deepseek.task.DifyWorkflowsThread;
import okhttp3.OkHttpClient;
import org.apache.commons.collections4.CollectionUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import org.springframework.web.multipart.MultipartFile;
import javax.annotation.Resource;
import java.io.IOException;
import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
/**
* @author yangtq
* @date 2025/3/6
*/
@Service
public class DifyWorkflowsServiceImpl implements DifyWorkflowsService {
@Resource
private DifyConfig difyConfig;
@Resource
private ThreadPoolTaskExecutor threadPoolTaskExecutor;
@Autowired
private OkHttpClient httpClient;
@Override
public ApiResponse importVerify(MultipartFile file) throws IOException, InterruptedException, ExecutionException {
ExcelUtil<WorkflowsExecVO> util1 = new ExcelUtil<>(WorkflowsExecVO.class);
ExcelUtil<DIfyImportVerifyDTO> util = new ExcelUtil<>(DIfyImportVerifyDTO.class);
List<DIfyImportVerifyDTO> dataList = util.importExcel(file.getInputStream());
if (!CollectionUtils.isEmpty(dataList)) {
// 提交任务到线程池
List<CompletableFuture<WorkflowsExecVO>> futures = dataList.stream()
.filter(data -> data.getQuestion() != null && !data.getQuestion().isEmpty())
.map(data -> CompletableFuture.supplyAsync(() -> {
DifyWorkflowsThread thread = new DifyWorkflowsThread(data, difyConfig, httpClient);
thread.run(); // 执行任务
return thread.getWorkflowsExecVO(); // 返回处理后的 WorkflowsExecVO
}, threadPoolTaskExecutor))
.collect(Collectors.toList());
// 等待所有任务完成
CompletableFuture<Void> allFutures = CompletableFuture.allOf(
futures.toArray(new CompletableFuture[0]));
allFutures.get(); // 阻塞等待所有任务完成
// 收集所有结果
List<WorkflowsExecVO> results = futures.stream()
.map(CompletableFuture::join)
.filter(Objects::nonNull) // 确保结果不为 null
.collect(Collectors.toList());
// 导出结果到 Excel 文件
return util1.exportExcel(results, "DifyWorkflows");
}
return ApiResponse.fail();
}
}
package com.ewaytek.deepseek.service.doubao;
import com.ewaytek.deepseek.doadmin.dto.doubao.DoubaoChatDTO;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter;
import java.io.IOException;
/**
* @author yangtq
* @date 2025/4/18
*/
public interface DoubaoService {
ResponseBodyEmitter sseChatPrompt(DoubaoChatDTO doubaoChatDTO) throws IOException;
}
package com.ewaytek.deepseek.service.doubao;
import com.alibaba.fastjson2.JSON;
import com.ewaytek.deepseek.config.DifyConfig;
import com.ewaytek.deepseek.config.DoubaoSseEventSourceListener;
import com.ewaytek.deepseek.doadmin.dto.dify.DifyChatBlockIngDTO;
import com.ewaytek.deepseek.doadmin.dto.doubao.*;
import com.ewaytek.deepseek.doadmin.vo.dify.BlockingVO;
import com.ewaytek.deepseek.doadmin.vo.dify.DifyStreamVO;
import com.ewaytek.deepseek.service.dify.impl.DifyApiClient;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import okhttp3.*;
import okhttp3.sse.EventSource;
import okhttp3.sse.EventSources;
import org.springframework.stereotype.Service;
import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter;
import javax.annotation.Resource;
import java.io.*;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
@Service
@Slf4j
@RequiredArgsConstructor
public class DoubaoServiceImpl implements DoubaoService {
@Resource
private DifyConfig difyConfig;
@Resource
private OkHttpClient httpClient;
@Resource
private DifyApiClient difyApiClient;
@Override
public ResponseBodyEmitter sseChatPrompt(DoubaoChatDTO doubaoChatDTO) throws IOException {
ResponseBodyEmitter sseEmitter = new ResponseBodyEmitter(60000L); // 增加超时时间到60秒
sseEmitter.onCompletion(() -> {
log.info("会话[{}]sse结束连接......", doubaoChatDTO.getModel());
});
//超时回调
sseEmitter.onTimeout(() -> {
log.error("会话[{}]sse连接超时......", doubaoChatDTO.getModel());
});
//异常回调
sseEmitter.onError(
throwable -> {
log.error("会话[{}]sse连接失败......", doubaoChatDTO.getModel());
}
);
String uid = UUID.randomUUID().toString();
DoubaoSseEventSourceListener doubaoSseEventSourceListener=new DoubaoSseEventSourceListener(sseEmitter,uid);
difyApiClient.streamChatCompletionDoubao( doubaoChatDTO, doubaoSseEventSourceListener);
return sseEmitter;
}
}
//package com.ewaytek.deepseek.service.doubao;
//
//import cn.hutool.core.lang.UUID;
//import com.alibaba.fastjson2.JSON;
//import com.ewaytek.deepseek.config.DifyConfig;
//import com.ewaytek.deepseek.doadmin.dto.dify.DifyChatBlockIngDTO;
//import com.ewaytek.deepseek.doadmin.dto.doubao.*;
//import com.ewaytek.deepseek.doadmin.vo.dify.DifyStreamVO;
//import com.fasterxml.jackson.databind.ObjectMapper;
//import lombok.RequiredArgsConstructor;
//import lombok.extern.slf4j.Slf4j;
//import okhttp3.*;
//import org.springframework.stereotype.Service;
//import org.springframework.web.servlet.mvc.method.annotation.ResponseBodyEmitter;
//
//import javax.annotation.Resource;
//import java.io.IOException;
//import java.util.ArrayList;
//import java.util.List;
//import java.util.concurrent.atomic.AtomicBoolean;
//
//@Service
//@Slf4j
//@RequiredArgsConstructor
//public class DoubaoServiceImpl2 implements DoubaoService {
//
// @Resource
// private DifyConfig difyConfig;
//
// @Resource
// private OkHttpClient httpClient;
//
// @Override
// public ResponseBodyEmitter sseChatPrompt(DoubaoChatDTO doubaoChatDTO) throws IOException {
// ResponseBodyEmitter sseEmitter = new ResponseBodyEmitter(0L); // 设置合理的超时时间(30秒)
// AtomicBoolean isClosed = new AtomicBoolean(false); // 用于标记连接是否已经关闭
//
// // 设置 SSE 连接的回调
// sseEmitter.onCompletion(() -> {
// log.info("会话[{}] SSE 连接已关闭", doubaoChatDTO.getModel());
// isClosed.set(true);
// });
// sseEmitter.onTimeout(() -> {
// log.error("会话[{}] SSE 连接超时", doubaoChatDTO.getModel());
// isClosed.set(true);
// sseEmitter.complete();
// });
// sseEmitter.onError(throwable -> {
// log.error("会话[{}] SSE 连接发生错误", doubaoChatDTO.getModel(), throwable);
// isClosed.set(true);
// sseEmitter.completeWithError(throwable);
// });
//
// // 启动心跳线程
// startHeartbeatThread(sseEmitter, isClosed);
//
// String uid = UUID.randomUUID().toString();
// DifyChatBlockIngDTO difyChatDTO = new DifyChatBlockIngDTO();
// difyChatDTO.setQuery(doubaoChatDTO.getModel());
// difyChatDTO.setUser("ewaytek" + System.currentTimeMillis());
// difyChatDTO.setResponseMode("streaming");
//
// ObjectMapper mapper = new ObjectMapper();
// String requestBody = mapper.writeValueAsString(difyChatDTO);
// Headers headers = new Headers.Builder()
// .add("Authorization", "Bearer " + difyConfig.getApiKeyTts())
// .add("Content-Type", "application/json")
// .build();
//
// Request request = new Request.Builder()
// .url(difyConfig.getApiHost() + "chat-messages")
// .post(RequestBody.create(MediaType.parse("application/json"), requestBody))
// .headers(headers)
// .build();
//
// // 使用异步调用
// httpClient.newCall(request).enqueue(new Callback() {
// @Override
// public void onFailure(Call call, IOException e) {
// log.error("调用第三方 API 失败", e);
// sseEmitter.completeWithError(e);
// }
//
// @Override
// public void onResponse(Call call, Response response) throws IOException {
// if (!response.isSuccessful()) {
// log.error("请求第三方 API 失败,状态码: {}", response.code());
// sseEmitter.completeWithError(new IOException("请求第三方 API 失败,状态码: " + response.code()));
// return;
// }
//
// try (ResponseBody responseBody = response.body()) {
// if (responseBody == null) {
// log.error("第三方 API 返回空响应体");
// sseEmitter.completeWithError(new IOException("第三方 API 返回空响应体"));
// return;
// }
//
// log.info("调用第三方 "+responseBody);
// while (!responseBody.source().exhausted()) {
// String line = responseBody.source().readUtf8Line();
// if (line == null || line.isEmpty()) {
// continue;
// }
// log.info("dify {}:", line);
// if (line.startsWith("data:")) {
// String eventData = line.substring(5).trim(); // 去掉 "data:" 前缀
// DifyStreamVO blockingVO = JSON.parseObject(eventData, DifyStreamVO.class);
//
// if ("message_end".equals(blockingVO.getEvent())) {
// sendCompletionMessage(sseEmitter, uid);
// break;
// }
//
// // 解析第三方 SSE 数据并转发给客户端
// sendProcessedResponse(sseEmitter, uid, blockingVO);
// }
// }
// } catch (IOException e) {
// log.error("处理第三方 API 响应时发生错误", e);
// sseEmitter.completeWithError(e);
// } finally {
// if (!isClosed.get()) {
// sseEmitter.complete();
// }
// }
// }
// });
//
// return sseEmitter;
// }
//
// private void startHeartbeatThread(ResponseBodyEmitter sseEmitter, AtomicBoolean isClosed) {
//// ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
//// scheduler.scheduleAtFixedRate(() -> {
//// if (!isClosed.get()) {
//// try {
//// sseEmitter.send("data: " + "{}" + "\n\n");
//// } catch (IOException e) {
//// log.error("发送心跳消息时发生错误", e);
//// isClosed.set(true);
//// sseEmitter.completeWithError(e);
//// }
//// }
//// }, 0, 5, TimeUnit.SECONDS); // 每 20 秒发送一次心跳
// }
//
// private void sendProcessedResponse(ResponseBodyEmitter sseEmitter, String uid, DifyStreamVO blockingVO) throws IOException {
// try {
//
// Delta delta = new Delta();
// delta.setRole("assistant");
// delta.setContent(blockingVO.getAnswer());
//
// Choices choices = new Choices();
// choices.setFinish_reason(null);
// choices.setIndex(0);
// choices.setDelta(delta);
//
// List<Choices> list = new ArrayList<>();
// list.add(choices);
//
// DoubaoChatResponseDTO responseDTO = new DoubaoChatResponseDTO();
// responseDTO.setId(uid);
// responseDTO.setCreated(System.currentTimeMillis());
// responseDTO.setObject("chat.completion.chunk");
// responseDTO.setModel("CustomLLM");
// responseDTO.setChoices(list);
// log.info("sendProcessedResponse "+JSON.toJSONString(responseDTO));
//
// sseEmitter.send("data: " + JSON.toJSONString(responseDTO) + "\n\n");
// }catch (IOException e) {
// log.error("发送数据时发生错误", e);
// // 在发生错误时,尝试关闭连接
// sseEmitter.completeWithError(e);
// }
//
// }
//
// private void sendCompletionMessage(ResponseBodyEmitter sseEmitter, String uid) throws IOException {
// Choices choices = new Choices();
// choices.setFinish_reason("stop");
// choices.setIndex(0);
//
// Usage usage = new Usage();
// usage.setPrompt_tokens(1);
// usage.setCompletion_tokens(2);
// usage.setTotal_tokens(3);
//
// List<Choices> list = new ArrayList<>();
// list.add(choices);
//
// DoubaoChatResponseDTO responseDTO = new DoubaoChatResponseDTO();
// responseDTO.setId(uid);
// responseDTO.setCreated(System.currentTimeMillis());
// responseDTO.setObject("chat.completion.chunk");
// responseDTO.setModel("CustomLLM");
// responseDTO.setChoices(list);
// responseDTO.setUsage(usage);
//
// sseEmitter.send("data: " + JSON.toJSONString(responseDTO) + "\n\n");
// sseEmitter.send("data: [DONE]\n\n");
// }
//}
package com.ewaytek.deepseek.service.impl;
import cn.hutool.core.collection.CollectionUtil;
import cn.hutool.http.ContentType;
import cn.hutool.http.Header;
import cn.hutool.http.HttpRequest;
import cn.hutool.http.HttpResponse;
import com.alibaba.fastjson2.JSON;
import com.ewaytek.deepseek.common.bean.base.ApiResponse;
import com.ewaytek.deepseek.config.DeepSeekConfig;
import com.ewaytek.deepseek.config.DifyConfig;
import com.ewaytek.deepseek.doadmin.dto.*;
import com.ewaytek.deepseek.doadmin.vo.MessageVO;
import com.ewaytek.deepseek.doadmin.vo.dify.DifyChatVO;
import com.ewaytek.deepseek.doadmin.vo.dify.DifyStreamVO;
import com.ewaytek.deepseek.service.AiChatService;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import okhttp3.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import javax.annotation.PostConstruct;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@Slf4j
@Service
public class AiChatServiceA implements AiChatService {
Map<String, Object> body = new HashMap<>();
Headers headers;
AiMessageDTO aiMessageDto;
@Autowired
private DeepSeekConfig deepSeekConfig;
@Autowired
private OkHttpClient httpClient;
@Autowired
private DifyConfig difyConfig;
// 初始化
@PostConstruct
public void init() {
body.put("model", deepSeekConfig.getModel());
body.put("frequency_penalty", deepSeekConfig.getFrequencyPenalty());
body.put("max_tokens", deepSeekConfig.getMaxTokens());
body.put("presence_penalty", deepSeekConfig.getPresencePenalty());
body.put("response_format", deepSeekConfig.getResponseFormat());
body.put("stop", deepSeekConfig.getStop());
body.put("stream", deepSeekConfig.getStream());
if (deepSeekConfig.getStream()) {
body.put("stream_options", deepSeekConfig.getStreamOptions());
}
body.put("temperature", deepSeekConfig.getTemperature());
body.put("top_p", deepSeekConfig.getTopP());
body.put("tools", deepSeekConfig.getTools());
body.put("tool_choice", deepSeekConfig.getToolChoice());
if (deepSeekConfig.getLogprobs()) {
body.put("logprobs", deepSeekConfig.getLogprobs());
body.put("top_logprobs", deepSeekConfig.getTopLogprobs());
}
// 构建请求头
headers = new Headers.Builder().add("Authorization", "Bearer " + deepSeekConfig.getApiKey()).add("Content-Type", "application/json").build();
if (deepSeekConfig.getStream()) {
aiMessageDto = new StreamingAiMessageDTO();
} else {
aiMessageDto = new DefaultAiMessageDTO();
}
}
@Override
public Flux<String> chat(List<Map<String, String>> question) {
body.put("messages", question);
// 将请求体转换为 JSON 字符串
ObjectMapper objectMapper = new ObjectMapper();
String payload;
try {
payload = objectMapper.writeValueAsString(body);
} catch (JsonProcessingException e) {
throw new RuntimeException("无法将负载转换为JSON", e);
}
// 构建请求
Request request = new Request.Builder().url(difyConfig.getApiHost()).post(RequestBody.create(payload, MediaType.parse("application/json"))).headers(headers).build();
// 返回 Flux<String> 以流式返回数据
return Flux.create(emitter -> {
try (Response response = httpClient.newCall(request).execute()) {
if (!response.isSuccessful()) {
emitter.error(new IOException("请求失败响应码: " + response.code()));
return;
}
// 处理流式响应
ResponseBody responseBody = response.body();
if (responseBody != null) {
while (!responseBody.source().exhausted()) {
String line = responseBody.source().readUtf8Line();
if (line != null && !line.isEmpty()) {
try {
emitter.next(aiMessageDto.getMessage(line)); // 逐步发送数据
} catch (Exception e) {
log.error("处理数据时发生错误: ", e);
}
}
}
emitter.complete(); // 完成流
}
} catch (IOException e) {
emitter.error(e);
log.error(e.getMessage());
}
});
}
@Override
public ApiResponse<MessageVO> msgChat(List<MessageDTO> messages) {
MessageChatDTO messageChatDTO = new MessageChatDTO();
messageChatDTO.setModel(deepSeekConfig.getModel());
messageChatDTO.setMessages(messages);
String json = JSON.toJSONString(messageChatDTO);
log.info("msgChat" + json);
HttpRequest request = HttpRequest.post(deepSeekConfig.getUrl())
.header(Header.AUTHORIZATION, "Bearer " + deepSeekConfig.getApiKey())
.header(Header.CONTENT_TYPE, "application/json")
.body(json)
.timeout(60000);
// Request request = new Request.Builder()
// .url(deepSeekConfig.getUrl())
// .post(RequestBody.create(json, MediaType.parse("application/json")))
// .headers(headers)
// .build();
// 执行请求并处理响应
try (HttpResponse response = request.execute()) {
if (response.isOk()) {
String responseBody = response.body();
log.info("Response: {}", responseBody);
return ApiResponse.success(JSON.parseObject(responseBody, MessageVO.class));
} else {
log.error("Request failed with status code: {}", response.getStatus());
return ApiResponse.fail("Request failed with status code: " + response.getStatus());
}
} catch (Exception e) {
log.error("Error during request: ", e);
return ApiResponse.fail("Error during request: " + e.getMessage());
}
}
}
package com.ewaytek.deepseek.task;
import cn.hutool.http.ContentType;
import cn.hutool.http.Header;
import cn.hutool.http.HttpRequest;
import cn.hutool.http.HttpResponse;
import com.alibaba.fastjson2.JSON;
import com.ewaytek.deepseek.config.DifyConfig;
import com.ewaytek.deepseek.doadmin.dto.dify.DIfyImportVerifyDTO;
import com.ewaytek.deepseek.doadmin.dto.dify.DifyChatBlockIngDTO;
import com.ewaytek.deepseek.doadmin.vo.dify.BlockingVO;
import com.ewaytek.deepseek.doadmin.vo.dify.DifyStreamVO;
import com.ewaytek.deepseek.doadmin.vo.dify.MetadataVO;
import com.ewaytek.deepseek.doadmin.vo.dify.RetrieverResources;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.RateLimiter;
import lombok.extern.slf4j.Slf4j;
import okhttp3.*;
import org.apache.commons.collections4.CollectionUtils;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.text.DecimalFormat;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Date;
import java.util.List;
import java.util.function.Consumer;
/**
* @author yangtq
* @date 2025/2/28
*/
@Slf4j
public class DifyThread implements Runnable{
private DIfyImportVerifyDTO data;
private DifyConfig difyConfig;
private static final RateLimiter rateLimiter = RateLimiter.create(5.0); // 每秒最多 5 次调用
private OkHttpClient httpClient;
// 静态共享集合,用于存储所有任务的结果
private static final List<DIfyImportVerifyDTO> results = Collections.synchronizedList(new ArrayList<>());
private final Consumer<DIfyImportVerifyDTO> callback;
public DifyThread(DIfyImportVerifyDTO data, DifyConfig difyConfig, OkHttpClient httpClient, Consumer<DIfyImportVerifyDTO> callback){
this.data = data;
this.difyConfig = difyConfig;
this.httpClient=httpClient;
this.callback = callback;
}
@Override
public void run() {
try {
rateLimiter.acquire(); // 获取令牌,阻塞等待直到获取成功
// 调用大模型处理数据
DIfyImportVerifyDTO result = processBatch(data);
if(callback!=null){
callback.accept(result); // 调用回调
}
// 处理结果
System.out.println("Processed result: " + result.getAnswer());
} catch (Exception e) {
System.err.println("Error processing data: " + e.getMessage());
}
}
private DIfyImportVerifyDTO processBatch(DIfyImportVerifyDTO dto) throws Exception {
DifyChatBlockIngDTO difyChatDTO = new DifyChatBlockIngDTO();
difyChatDTO.setQuery(dto.getQuestion());
difyChatDTO.setUser("ewaytek"+System.currentTimeMillis());
difyChatDTO.setResponseMode("streaming");
//构建请求参数json数据
ObjectMapper mapper = new ObjectMapper();
String requestBody = mapper.writeValueAsString(difyChatDTO);
Headers headers = new Headers.Builder().add("Authorization", "Bearer " + difyConfig.getApiKey()).add("Content-Type", "application/json").build();
Request request = new Request.Builder().url(difyConfig.getApiHost() + "chat-messages").post(RequestBody.create(MediaType.parse(ContentType.JSON.getValue()), requestBody))
.headers(headers).build();
List<RetrieverResources> retrieverResourcesList=new ArrayList<>();
try (Response response = httpClient.newCall(request).execute()) {
if (!response.isSuccessful()) {
return dto;
}
// 处理流式响应
ResponseBody responseBody = response.body();
if (responseBody != null) {
StringBuilder responseBuilder = new StringBuilder();
while (!responseBody.source().exhausted()) {
String line = responseBody.source().readUtf8Line();
if (line != null && !line.isEmpty()) {
if (line.startsWith("data:")) { // 处理 SSE 格式
String eventData = line.substring(5).trim(); // 去掉 "data:" 前缀
DifyStreamVO blockingVO = JSON.parseObject(eventData, DifyStreamVO.class);
MetadataVO metadataVO= blockingVO.getMetadata();
if(metadataVO!=null){
if (!CollectionUtils.isEmpty(metadataVO.getRetrieverResources())) {
retrieverResourcesList.addAll(metadataVO.getRetrieverResources());
}
}
responseBuilder.append(blockingVO.getAnswer());
}
}
}
dto.setRetrieverResources(retrieverResourcesList);
dto.setAnswer(responseBuilder.toString());
}
} catch (IOException e) {
log.error(e.getMessage());
}
return dto;
}
}
package com.ewaytek.deepseek.task;
import cn.hutool.http.ContentType;
import com.alibaba.fastjson2.JSON;
import com.ewaytek.deepseek.config.DifyConfig;
import com.ewaytek.deepseek.doadmin.dto.dify.DIfyImportVerifyDTO;
import com.ewaytek.deepseek.doadmin.dto.dify.DifyChatBlockIngDTO;
import com.ewaytek.deepseek.doadmin.dto.dify.demo.DIfyWorkflowsResultDTO;
import com.ewaytek.deepseek.doadmin.dto.dify.demo.DifyWorkflowsDTO;
import com.ewaytek.deepseek.doadmin.vo.dify.DifyStreamVO;
import com.ewaytek.deepseek.doadmin.vo.dify.MetadataVO;
import com.ewaytek.deepseek.doadmin.vo.dify.RetrieverResources;
import com.ewaytek.deepseek.doadmin.vo.dify.workflows.WorkflowsExecVO;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.util.concurrent.RateLimiter;
import lombok.extern.slf4j.Slf4j;
import okhttp3.*;
import org.apache.commons.collections4.CollectionUtils;
import java.io.IOException;
import java.text.DecimalFormat;
import java.util.*;
import java.util.function.Consumer;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* @author yangtq
* @date 2025/3/6
*/
@Slf4j
public class DifyWorkflowsThread implements Runnable {
private DifyConfig difyConfig;
private static final RateLimiter rateLimiter = RateLimiter.create(5.0); // 每秒最多 5 次调用
private WorkflowsExecVO workflowsExecVO; // 用于存储处理结果
private OkHttpClient httpClient;
private static final List<DIfyImportVerifyDTO> results = Collections.synchronizedList(new ArrayList<>());
private DIfyImportVerifyDTO data;
public DifyWorkflowsThread(DIfyImportVerifyDTO data, DifyConfig difyConfig, OkHttpClient httpClient) {
this.data = data;
this.difyConfig = difyConfig;
this.httpClient = httpClient;
}
// 提供一个方法来获取处理后的 WorkflowsExecVO 对象
public WorkflowsExecVO getWorkflowsExecVO() {
return workflowsExecVO;
}
@Override
public void run() {
try {
rateLimiter.acquire(); // 获取令牌,阻塞等待直到获取成功
// 调用大模型处理数据
workflowsExecVO = new WorkflowsExecVO();
DIfyImportVerifyDTO result = processBatch(data);
workflowsExecVO.setQuestion(data.getQuestion());
workflowsExecVO.setStandard(data.getStandard());
workflowsExecVO.setAnswer(result.getAnswer());
String answer = cleanContent(result.getAnswer());
workflowsExecVO.setQuestion1(answer);
workflowsExecVO.setTime(result.getTime());
workflowsExecVO = DifyWorkflows(workflowsExecVO);
// 处理结果
System.out.println("Processed result: " + result.getAnswer());
} catch (Exception e) {
System.err.println("Error processing data: " + e.getMessage());
}
}
private DIfyImportVerifyDTO processBatch(DIfyImportVerifyDTO dto) throws Exception {
DifyChatBlockIngDTO difyChatDTO = new DifyChatBlockIngDTO();
difyChatDTO.setQuery(dto.getQuestion());
difyChatDTO.setUser("ewaytek");
difyChatDTO.setResponseMode("streaming");
//构建请求参数json数据
ObjectMapper mapper = new ObjectMapper();
String requestBody = mapper.writeValueAsString(difyChatDTO);
Headers headers = new Headers.Builder().add("Authorization", "Bearer " + "app-9D2IddTOvZDeBGLm1iNtQ0EF").add("Content-Type", "application/json").build();
Request request = new Request.Builder().url(difyConfig.getApiHost() + "chat-messages").post(RequestBody.create(MediaType.parse(ContentType.JSON.getValue()), requestBody))
.headers(headers).build();
List<RetrieverResources> retrieverResourcesList = new ArrayList<>();
Date start = new Date();
try (Response response = httpClient.newCall(request).execute()) {
if (!response.isSuccessful()) {
return dto;
}
// 处理流式响应
ResponseBody responseBody = response.body();
if (responseBody != null) {
StringBuilder responseBuilder = new StringBuilder();
while (!responseBody.source().exhausted()) {
String line = responseBody.source().readUtf8Line();
if (line != null && !line.isEmpty()) {
if (line.startsWith("data:")) { // 处理 SSE 格式
String eventData = line.substring(5).trim(); // 去掉 "data:" 前缀
DifyStreamVO blockingVO = JSON.parseObject(eventData, DifyStreamVO.class);
MetadataVO metadataVO = blockingVO.getMetadata();
if (metadataVO != null) {
if (!CollectionUtils.isEmpty(metadataVO.getRetrieverResources())) {
retrieverResourcesList.addAll(metadataVO.getRetrieverResources());
}
}
responseBuilder.append(blockingVO.getAnswer());
}
}
}
Date end = new Date();
String decimalFormat= new DecimalFormat("0.0000").format((end.getTime() - start.getTime())/1000);
dto.setRetrieverResources(retrieverResourcesList);
dto.setAnswer(responseBuilder.toString());
dto.setTime(decimalFormat);
}
} catch (IOException e) {
log.error(e.getMessage());
}
return dto;
}
public WorkflowsExecVO DifyWorkflows(WorkflowsExecVO workflowsExecVO) throws JsonProcessingException {
DifyWorkflowsDTO difyWorkflowsDTO = new DifyWorkflowsDTO();
difyWorkflowsDTO.setUser("abc-123");
difyWorkflowsDTO.setResponseMode("streaming");
Map<String, String> inputs = new HashMap<>();
inputs.put("question", workflowsExecVO.getQuestion1());
difyWorkflowsDTO.setInputs(inputs);
ObjectMapper mapper = new ObjectMapper();
String requestBody = mapper.writeValueAsString(difyWorkflowsDTO);
Headers headers = new Headers.Builder().add("Authorization", "Bearer " + "app-9RuAoErdMXkG4jVN0YNjXekp").add("Content-Type", "application/json").build();
Request request = new Request.Builder().url(difyConfig.getApiHost() + "workflows/run").post(okhttp3.RequestBody.create(MediaType.parse(ContentType.JSON.getValue()), requestBody))
.headers(headers).build();
List<WorkflowsExecVO.Workflows> workflowsList = new ArrayList<>();
try (Response response = httpClient.newCall(request).execute()) {
if (!response.isSuccessful()) {
log.info("DifyWorkflows#" + response.message());
return workflowsExecVO;
}
// 处理流式响应
ResponseBody responseBody = response.body();
if (responseBody != null) {
while (!responseBody.source().exhausted()) {
String line = responseBody.source().readUtf8Line();
if (line == null || line.isEmpty()) {
continue;
}
if (line.startsWith("data:")) { // 处理 SSE 格式
log.info("DifyWorkflows##" + line);
String eventData = line.substring(5).trim(); // 去掉 "data:" 前缀
DIfyWorkflowsResultDTO blockingVO = JSON.parseObject(eventData, DIfyWorkflowsResultDTO.class);
if (!"workflow_finished".equals(blockingVO.getEvent())) {
continue;
}
DIfyWorkflowsResultDTO.Outputs metadataVO = blockingVO.getData().getOutputs();
if (metadataVO == null) {
continue;
}
if (CollectionUtils.isEmpty(metadataVO.getResult())) {
continue;
}
for (DIfyWorkflowsResultDTO.ResultItem resultItem : metadataVO.getResult()) {
WorkflowsExecVO.Workflows workflows = new WorkflowsExecVO.Workflows();
workflows.setReliability(String.valueOf(resultItem.getMetadata().getScore()));
workflows.setAnswer(resultItem.getContent());
workflowsList.add(workflows);
}
}
}
workflowsExecVO.setWorkflowsList(workflowsList);
}
} catch (IOException e) {
log.error(e.getMessage());
}
return workflowsExecVO;
}
public static String cleanContent(String content) {
// 移除 <details> 标签及其内容
Pattern detailsPattern = Pattern.compile("<details.*?</details>", Pattern.DOTALL);
Matcher detailsMatcher = detailsPattern.matcher(content);
content = detailsMatcher.replaceAll("").trim();
// 移除尾部的 null
Pattern nullPattern = Pattern.compile("null$", Pattern.DOTALL);
Matcher nullMatcher = nullPattern.matcher(content);
content = nullMatcher.replaceAll("").trim();
// 移除多余的换行符和空格
Pattern whitespacePattern = Pattern.compile("\\s+\\n", Pattern.DOTALL);
Matcher whitespaceMatcher = whitespacePattern.matcher(content);
content = whitespaceMatcher.replaceAll("\n").trim();
return content;
}
}
# 项目相关配置
robot:
# 名称
name: deepseek
# 版本
version: 3.8.8
# 版权年份
copyrightYear: 2025
# 文件路径 示例( Windows配置D:/robot/uploadPath,Linux配置 /home/robot/uploadPath)
# profile: /data/ewaytek
profile: /Users/yang/Downloads
# 获取ip地址开关
addressEnabled: false
# 验证码类型 math 数字计算 char 字符验证
captchaType: math
spring:
application:
name: deepSeek
mvc:
async:
request-timeout: 300000
server:
port: 26061
tomcat:
max-threads: 500
uri-encoding: utf-8
# 这个参数定义了Tomcat服务器在任何给定时间可以接受和处理的最大连接数。如果连接数超过了这个值,新的连接可能会被拒绝
max-connections: 1000
# 这个参数定义了当所有可能的请求处理线程都在使用时,传入连接请求的最大队列长度。如果队列也满了,新的连接请求可能会被拒绝
accept-count: 200
# 这个参数定义了连接器在接受连接后等待显示请求URI行的时间(毫秒)。如果超过这个时间,连接可能会被关闭
connection-timeout: 120000
# 这个参数定义了在一个连接上可以发送的最大HTTP请求数量。超过这个数量后,服务器将关闭连接
max-keep-alive-requests: 100
servlet:
context-path: /
http:
encoding:
charset: utf-8
enabled: true
force: true
session:
timeout: 1800
tomcat:
connection-timeout: 1800000 # 设置连接超时时间为1800000毫秒(30分钟)
# 详细信息请访问下面的链接
# https://api-docs.deepseek.com/zh-cn/api/create-chat-completion
deepseek:
url: http://10.74.72.24:3001/v1/chat/completions
# 你的API key
api_key: sk-1vRj9WeCuIJITRTw1c4844Ab75Af4c9cBa639c47A5Ee3323
# 使用的模型的 ID。您可以使用 deepseek-chat。
model: deepseek-r1:14b
# 介于 -2.0 和 2.0 之间的数字。如果该值为正,那么新 token 会根据其在已有文本中的出现频率受到相应的惩罚,降低模型重复相同内容的可能性。
frequency_penalty: 0
# 介于 1 到 8192 间的整数,限制一次请求中模型生成 completion 的最大 token 数。输入 token 和输出 token 的总长度受模型的上下文长度的限制。
max_tokens: 4096
# 介于 -2.0 和 2.0 之间的数字。如果该值为正,那么新 token 会根据其是否已在已有文本中出现受到相应的惩罚,从而增加模型谈论新主题的可能性。
presence_penalty: 0
# 一个 object,指定模型必须输出的格式。
response_format:
type: text
# 一个 string 或最多包含 16 个 string 的 list,在遇到这些词时,API 将停止生成更多的 token。
stop: null
# 如果设置为 True,将会以 SSE(server-sent events)的形式以流式发送消息增量。消息流以 data: [DONE] 结尾。
stream: true
# 流式输出相关选项。只有在 stream 参数为 true 时,才可设置此参数。
stream_options:
include_usage: false
# 采样温度,介于 0 和 2 之间。更高的值,如 0.8,会使输出更随机,而更低的值,如 0.2,会使其更加集中和确定。
temperature: 1
# 作为调节采样温度的替代方案,模型会考虑前 top_p 概率的 token 的结果。所以 0.1 就意味着只有包括在最高 10% 概率中的 token 会被考虑。
top_p: 1
# 模型可能会调用的 tool 的列表。目前,仅支持 function 作为工具。
tools: []
# 控制模型调用 tool 的行为。
tool_choice: none
# 是否返回所输出 token 的对数概率。如果为 true,则在 message 的 content 中返回每个输出 token 的对数概率。
logprobs: false
# 一个介于 0 到 20 之间的整数 N,指定每个输出位置返回输出概率 top N 的 token,且返回这些 token 的对数概率。指定此参数时,logprobs 必须为 true。
top_logprobs: 10
dify:
# api_key: app-i3dujtr9ImSN5jqwx7u8nUkJ
api_key: app-adP42NC6IWmxJzaaaJ0LNSZF
api_key_tts: app-adP42NC6IWmxJzaaaJ0LNSZF
api_host: http://172.27.30.123/v1/
conversation_url: http://172.27.30.123/api/conversations
tts:
appKey: 0SYXUBehQc9NQ3ZQ
accessKeyId: LTAI5tJtpY5u4omeM2CKcvGy
accessKeySecret: ImT2N1IfOkBT0mfAI9VSnk53duyCXi
url:
voice: Aiyu
volume: 50
speech_rate: -50
pitch_rate: 0
longText: 0
# 项目相关配置
robot:
# 名称
name: deepseek
# 版本
version: 3.8.8
# 版权年份
copyrightYear: 2025
# 文件路径 示例( Windows配置D:/robot/uploadPath,Linux配置 /home/robot/uploadPath)
# profile: /data/ewaytek
profile: /Users/yang/Downloads
# 获取ip地址开关
addressEnabled: false
# 验证码类型 math 数字计算 char 字符验证
captchaType: math
spring:
application:
name: deepSeek
mvc:
async:
request-timeout: 300000
server:
port: 26061
tomcat:
max-threads: 500
uri-encoding: utf-8
# 这个参数定义了Tomcat服务器在任何给定时间可以接受和处理的最大连接数。如果连接数超过了这个值,新的连接可能会被拒绝
max-connections: 1000
# 这个参数定义了当所有可能的请求处理线程都在使用时,传入连接请求的最大队列长度。如果队列也满了,新的连接请求可能会被拒绝
accept-count: 200
# 这个参数定义了连接器在接受连接后等待显示请求URI行的时间(毫秒)。如果超过这个时间,连接可能会被关闭
connection-timeout: 120000
# 这个参数定义了在一个连接上可以发送的最大HTTP请求数量。超过这个数量后,服务器将关闭连接
max-keep-alive-requests: 100
servlet:
context-path: /
http:
encoding:
charset: utf-8
enabled: true
force: true
session:
timeout: 1800
tomcat:
connection-timeout: 1800000 # 设置连接超时时间为1800000毫秒(30分钟)
# 详细信息请访问下面的链接
# https://api-docs.deepseek.com/zh-cn/api/create-chat-completion
deepseek:
url: http://10.74.72.24:3001/v1/chat/completions
# 你的API key
api_key: sk-1vRj9WeCuIJITRTw1c4844Ab75Af4c9cBa639c47A5Ee3323
# 使用的模型的 ID。您可以使用 deepseek-chat。
model: deepseek-r1:14b
# 介于 -2.0 和 2.0 之间的数字。如果该值为正,那么新 token 会根据其在已有文本中的出现频率受到相应的惩罚,降低模型重复相同内容的可能性。
frequency_penalty: 0
# 介于 1 到 8192 间的整数,限制一次请求中模型生成 completion 的最大 token 数。输入 token 和输出 token 的总长度受模型的上下文长度的限制。
max_tokens: 4096
# 介于 -2.0 和 2.0 之间的数字。如果该值为正,那么新 token 会根据其是否已在已有文本中出现受到相应的惩罚,从而增加模型谈论新主题的可能性。
presence_penalty: 0
# 一个 object,指定模型必须输出的格式。
response_format:
type: text
# 一个 string 或最多包含 16 个 string 的 list,在遇到这些词时,API 将停止生成更多的 token。
stop: null
# 如果设置为 True,将会以 SSE(server-sent events)的形式以流式发送消息增量。消息流以 data: [DONE] 结尾。
stream: true
# 流式输出相关选项。只有在 stream 参数为 true 时,才可设置此参数。
stream_options:
include_usage: false
# 采样温度,介于 0 和 2 之间。更高的值,如 0.8,会使输出更随机,而更低的值,如 0.2,会使其更加集中和确定。
temperature: 1
# 作为调节采样温度的替代方案,模型会考虑前 top_p 概率的 token 的结果。所以 0.1 就意味着只有包括在最高 10% 概率中的 token 会被考虑。
top_p: 1
# 模型可能会调用的 tool 的列表。目前,仅支持 function 作为工具。
tools: []
# 控制模型调用 tool 的行为。
tool_choice: none
# 是否返回所输出 token 的对数概率。如果为 true,则在 message 的 content 中返回每个输出 token 的对数概率。
logprobs: false
# 一个介于 0 到 20 之间的整数 N,指定每个输出位置返回输出概率 top N 的 token,且返回这些 token 的对数概率。指定此参数时,logprobs 必须为 true。
top_logprobs: 10
dify:
# api_key: app-i3dujtr9ImSN5jqwx7u8nUkJ
api_key: app-adP42NC6IWmxJzaaaJ0LNSZF
api_key_tts: app-adP42NC6IWmxJzaaaJ0LNSZF
api_host: http://172.27.30.123/v1/
conversation_url: http://172.27.30.123/api/conversations
tts:
appKey: 0SYXUBehQc9NQ3ZQ
accessKeyId: LTAI5tJtpY5u4omeM2CKcvGy
accessKeySecret: ImT2N1IfOkBT0mfAI9VSnk53duyCXi
url:
voice: Aiyu
volume: 50
speech_rate: -50
pitch_rate: 0
longText: 0
spring:
application:
name: deepSeek
profiles:
active:
- prod
servlet:
multipart:
max-file-size: 50MB
max-request-size: 50MB
[program:customer_deepseek]
command=/usr/local/java/jdk1.8.0_171/bin/java -jar /home/app/custmoer_deepseek/ewaytek-deepseek-web-0.0.1-SNAPSHOT.jar ; the program (relative uses PATH, can take args)
;process_name=%(program_name)s ; process_name expr (default %(program_name)s)
;numprocs=1 ; number of processes copies to start (def 1)
directory=/home/app/custmoer_deepseek ; directory to cwd to before exec (def no cwd)
;umask=022 ; umask for process (default None)
priority=909 ; the relative start priority (default 999)
autostart=true ; start at supervisord start (default: true)
autorestart=unexpected ; whether/when to restart (default: unexpected)
startsecs=1 ; number of secs prog must stay running (def. 1)
startretries=3 ; max # of serial start failures (default 3)
exitcodes=0,2 ; 'expected' exit codes for process (default 0,2)
stopsignal=QUIT ; signal used to kill process (default TERM)
stopwaitsecs=10 ; max num secs to wait b4 SIGKILL (default 10)
stopasgroup=false ; send stop signal to the UNIX process group (default false)
;killasgroup=false ; SIGKILL the UNIX process group (def false)
;user=chrism ; setuid to this UNIX account to run the program
;redirect_stderr=true ; redirect proc stderr to stdout (default false)
stdout_logfile=/home/app/custmoer_deepseek/logs/stdout.log ; stdout log path, NONE for none; default AUTO
stdout_logfile_maxbytes=100MB ; max # logfile bytes b4 rotation (default 50MB)
stdout_logfile_backups=10 ; # of stdout logfile backups (default 10)
stdout_capture_maxbytes=1MB ; number of bytes in 'capturemode' (default 0)
stdout_events_enabled=false ; emit events on stdout writes (default false)
stderr_logfile=/home/app/custmoer_deepseek/logs/stderr.log ; stderr log path, NONE for none; default AUTO
stderr_logfile_maxbytes=1MB ; max # logfile bytes b4 rotation (default 50MB)
stderr_logfile_backups=10 ; # of stderr logfile backups (default 10)
stderr_capture_maxbytes=1MB ; number of bytes in 'capturemode' (default 0)
stderr_events_enabled=false ; emit events on stderr writes (default false)
;environment=A="1",B="2" ; process environment additions (def no adds)
;serverurl=AUTO
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment