Add RAG client
This commit is contained in:
@@ -1,28 +1,24 @@
|
||||
/*
|
||||
* Copyright 2025 Google. This software is provided as-is, without warranty or representation for any use or purpose.
|
||||
* Your use of it is subject to your agreement with Google.
|
||||
*/
|
||||
|
||||
package com.example.service.base;
|
||||
|
||||
import com.example.mapper.conversation.DialogflowRequestMapper;
|
||||
import com.example.mapper.conversation.DialogflowResponseMapper;
|
||||
import com.example.dto.dialogflow.base.DetectIntentRequestDTO;
|
||||
import com.example.dto.dialogflow.base.DetectIntentResponseDTO;
|
||||
import com.example.exception.DialogflowClientException;
|
||||
import com.example.mapper.conversation.DialogflowRequestMapper;
|
||||
import com.example.mapper.conversation.DialogflowResponseMapper;
|
||||
import com.google.api.gax.rpc.ApiException;
|
||||
import com.google.cloud.dialogflow.cx.v3.DetectIntentRequest;
|
||||
import com.google.cloud.dialogflow.cx.v3.QueryParameters;
|
||||
import com.google.cloud.dialogflow.cx.v3.SessionsClient;
|
||||
import com.google.cloud.dialogflow.cx.v3.SessionName;
|
||||
import com.google.cloud.dialogflow.cx.v3.SessionsClient;
|
||||
import com.google.cloud.dialogflow.cx.v3.SessionsSettings;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.stereotype.Service;
|
||||
import reactor.core.publisher.Mono;
|
||||
import javax.annotation.PreDestroy;
|
||||
import java.io.IOException;
|
||||
import java.util.Objects;
|
||||
import javax.annotation.PreDestroy;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.stereotype.Service;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.util.retry.Retry;
|
||||
|
||||
/**
|
||||
@@ -32,9 +28,12 @@ import reactor.util.retry.Retry;
|
||||
* all within a reactive programming context.
|
||||
*/
|
||||
@Service
|
||||
public class DialogflowClientService {
|
||||
@Qualifier("dialogflowClientService")
|
||||
public class DialogflowClientService implements IntentDetectionService {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(DialogflowClientService.class);
|
||||
private static final Logger logger = LoggerFactory.getLogger(
|
||||
DialogflowClientService.class
|
||||
);
|
||||
|
||||
private final String dialogflowCxProjectId;
|
||||
private final String dialogflowCxLocation;
|
||||
@@ -43,16 +42,20 @@ public class DialogflowClientService {
|
||||
private final DialogflowRequestMapper dialogflowRequestMapper;
|
||||
private final DialogflowResponseMapper dialogflowResponseMapper;
|
||||
private SessionsClient sessionsClient;
|
||||
|
||||
|
||||
public DialogflowClientService(
|
||||
|
||||
@org.springframework.beans.factory.annotation.Value("${dialogflow.cx.project-id}") String dialogflowCxProjectId,
|
||||
@org.springframework.beans.factory.annotation.Value("${dialogflow.cx.location}") String dialogflowCxLocation,
|
||||
@org.springframework.beans.factory.annotation.Value("${dialogflow.cx.agent-id}") String dialogflowCxAgentId,
|
||||
@org.springframework.beans.factory.annotation.Value(
|
||||
"${dialogflow.cx.project-id}"
|
||||
) String dialogflowCxProjectId,
|
||||
@org.springframework.beans.factory.annotation.Value(
|
||||
"${dialogflow.cx.location}"
|
||||
) String dialogflowCxLocation,
|
||||
@org.springframework.beans.factory.annotation.Value(
|
||||
"${dialogflow.cx.agent-id}"
|
||||
) String dialogflowCxAgentId,
|
||||
DialogflowRequestMapper dialogflowRequestMapper,
|
||||
DialogflowResponseMapper dialogflowResponseMapper)
|
||||
throws IOException {
|
||||
|
||||
DialogflowResponseMapper dialogflowResponseMapper
|
||||
) throws IOException {
|
||||
this.dialogflowCxProjectId = dialogflowCxProjectId;
|
||||
this.dialogflowCxLocation = dialogflowCxLocation;
|
||||
this.dialogflowCxAgentId = dialogflowCxAgentId;
|
||||
@@ -60,15 +63,28 @@ public class DialogflowClientService {
|
||||
this.dialogflowResponseMapper = dialogflowResponseMapper;
|
||||
|
||||
try {
|
||||
String regionalEndpoint = String.format("%s-dialogflow.googleapis.com:443", dialogflowCxLocation);
|
||||
String regionalEndpoint = String.format(
|
||||
"%s-dialogflow.googleapis.com:443",
|
||||
dialogflowCxLocation
|
||||
);
|
||||
SessionsSettings sessionsSettings = SessionsSettings.newBuilder()
|
||||
.setEndpoint(regionalEndpoint)
|
||||
.build();
|
||||
this.sessionsClient = SessionsClient.create(sessionsSettings);
|
||||
logger.info("Dialogflow CX SessionsClient initialized successfully for endpoint: {}", regionalEndpoint);
|
||||
logger.info("Dialogflow CX SessionsClient initialized successfully for agent - Test Agent version: {}", dialogflowCxAgentId);
|
||||
logger.info(
|
||||
"Dialogflow CX SessionsClient initialized successfully for endpoint: {}",
|
||||
regionalEndpoint
|
||||
);
|
||||
logger.info(
|
||||
"Dialogflow CX SessionsClient initialized successfully for agent - Test Agent version: {}",
|
||||
dialogflowCxAgentId
|
||||
);
|
||||
} catch (IOException e) {
|
||||
logger.error("Failed to create Dialogflow CX SessionsClient: {}", e.getMessage(), e);
|
||||
logger.error(
|
||||
"Failed to create Dialogflow CX SessionsClient: {}",
|
||||
e.getMessage(),
|
||||
e
|
||||
);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
@@ -81,36 +97,63 @@ public class DialogflowClientService {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public Mono<DetectIntentResponseDTO> detectIntent(
|
||||
String sessionId,
|
||||
DetectIntentRequestDTO request) {
|
||||
|
||||
Objects.requireNonNull(sessionId, "Dialogflow session ID cannot be null.");
|
||||
Objects.requireNonNull(request, "Dialogflow request DTO cannot be null.");
|
||||
String sessionId,
|
||||
DetectIntentRequestDTO request
|
||||
) {
|
||||
Objects.requireNonNull(
|
||||
sessionId,
|
||||
"Dialogflow session ID cannot be null."
|
||||
);
|
||||
Objects.requireNonNull(
|
||||
request,
|
||||
"Dialogflow request DTO cannot be null."
|
||||
);
|
||||
|
||||
logger.info("Initiating detectIntent for session: {}", sessionId);
|
||||
|
||||
DetectIntentRequest.Builder detectIntentRequestBuilder;
|
||||
try {
|
||||
detectIntentRequestBuilder = dialogflowRequestMapper.mapToDetectIntentRequestBuilder(request);
|
||||
logger.debug("Obtained partial DetectIntentRequest.Builder from mapper for session: {}", sessionId);
|
||||
detectIntentRequestBuilder =
|
||||
dialogflowRequestMapper.mapToDetectIntentRequestBuilder(
|
||||
request
|
||||
);
|
||||
logger.debug(
|
||||
"Obtained partial DetectIntentRequest.Builder from mapper for session: {}",
|
||||
sessionId
|
||||
);
|
||||
} catch (IllegalArgumentException e) {
|
||||
logger.error(" Failed to map DTO to partial Protobuf request for session {}: {}", sessionId, e.getMessage());
|
||||
return Mono.error(new IllegalArgumentException("Invalid Dialogflow request input: " + e.getMessage()));
|
||||
logger.error(
|
||||
" Failed to map DTO to partial Protobuf request for session {}: {}",
|
||||
sessionId,
|
||||
e.getMessage()
|
||||
);
|
||||
return Mono.error(
|
||||
new IllegalArgumentException(
|
||||
"Invalid Dialogflow request input: " + e.getMessage()
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
SessionName sessionName = SessionName.newBuilder()
|
||||
.setProject(dialogflowCxProjectId)
|
||||
.setLocation(dialogflowCxLocation)
|
||||
.setAgent(dialogflowCxAgentId)
|
||||
.setSession(sessionId)
|
||||
.build();
|
||||
.setProject(dialogflowCxProjectId)
|
||||
.setLocation(dialogflowCxLocation)
|
||||
.setAgent(dialogflowCxAgentId)
|
||||
.setSession(sessionId)
|
||||
.build();
|
||||
detectIntentRequestBuilder.setSession(sessionName.toString());
|
||||
logger.debug("Set session path {} on the request builder for session: {}", sessionName.toString(), sessionId);
|
||||
logger.debug(
|
||||
"Set session path {} on the request builder for session: {}",
|
||||
sessionName.toString(),
|
||||
sessionId
|
||||
);
|
||||
|
||||
QueryParameters.Builder queryParamsBuilder;
|
||||
if (detectIntentRequestBuilder.hasQueryParams()) {
|
||||
queryParamsBuilder = detectIntentRequestBuilder.getQueryParams().toBuilder();
|
||||
queryParamsBuilder = detectIntentRequestBuilder
|
||||
.getQueryParams()
|
||||
.toBuilder();
|
||||
} else {
|
||||
queryParamsBuilder = QueryParameters.newBuilder();
|
||||
}
|
||||
@@ -118,50 +161,89 @@ public class DialogflowClientService {
|
||||
detectIntentRequestBuilder.setQueryParams(queryParamsBuilder.build());
|
||||
|
||||
// Build the final DetectIntentRequest Protobuf object
|
||||
DetectIntentRequest detectIntentRequest = detectIntentRequestBuilder.build();
|
||||
DetectIntentRequest detectIntentRequest =
|
||||
detectIntentRequestBuilder.build();
|
||||
return Mono.fromCallable(() -> {
|
||||
logger.debug("Calling Dialogflow CX detectIntent for session: {}", sessionId);
|
||||
logger.debug(
|
||||
"Calling Dialogflow CX detectIntent for session: {}",
|
||||
sessionId
|
||||
);
|
||||
return sessionsClient.detectIntent(detectIntentRequest);
|
||||
})
|
||||
.retryWhen(
|
||||
reactor.util.retry.Retry.backoff(
|
||||
3,
|
||||
java.time.Duration.ofSeconds(1)
|
||||
)
|
||||
.filter(throwable -> {
|
||||
if (throwable instanceof ApiException apiException) {
|
||||
com.google.api.gax.rpc.StatusCode.Code code =
|
||||
apiException.getStatusCode().getCode();
|
||||
boolean isRetryable =
|
||||
code ==
|
||||
com.google.api.gax.rpc.StatusCode.Code.INTERNAL ||
|
||||
code ==
|
||||
com.google.api.gax.rpc.StatusCode.Code.UNAVAILABLE;
|
||||
if (isRetryable) {
|
||||
logger.warn(
|
||||
"Retrying Dialogflow CX call for session {} due to transient error: {}",
|
||||
sessionId,
|
||||
code
|
||||
);
|
||||
}
|
||||
return isRetryable;
|
||||
}
|
||||
return false;
|
||||
})
|
||||
.doBeforeRetry(retrySignal ->
|
||||
logger.debug(
|
||||
"Retry attempt #{} for session {}",
|
||||
retrySignal.totalRetries() + 1,
|
||||
sessionId
|
||||
)
|
||||
)
|
||||
.onRetryExhaustedThrow((retrySpec, retrySignal) -> {
|
||||
logger.error(
|
||||
"Dialogflow CX retries exhausted for session {}",
|
||||
sessionId
|
||||
);
|
||||
return retrySignal.failure();
|
||||
})
|
||||
)
|
||||
.onErrorMap(ApiException.class, e -> {
|
||||
String statusCode = e.getStatusCode().getCode().name();
|
||||
String message = e.getMessage();
|
||||
String detailedLog = message;
|
||||
|
||||
.retryWhen(reactor.util.retry.Retry.backoff(3, java.time.Duration.ofSeconds(1))
|
||||
.filter(throwable -> {
|
||||
if (throwable instanceof ApiException apiException) {
|
||||
com.google.api.gax.rpc.StatusCode.Code code = apiException.getStatusCode().getCode();
|
||||
boolean isRetryable = code == com.google.api.gax.rpc.StatusCode.Code.INTERNAL ||
|
||||
code == com.google.api.gax.rpc.StatusCode.Code.UNAVAILABLE;
|
||||
if (isRetryable) {
|
||||
logger.warn("Retrying Dialogflow CX call for session {} due to transient error: {}", sessionId, code);
|
||||
}
|
||||
return isRetryable;
|
||||
if (
|
||||
e.getCause() instanceof
|
||||
io.grpc.StatusRuntimeException grpcEx
|
||||
) {
|
||||
detailedLog = String.format(
|
||||
"Status: %s, Message: %s, Trailers: %s",
|
||||
grpcEx.getStatus().getCode(),
|
||||
grpcEx.getStatus().getDescription(),
|
||||
grpcEx.getTrailers()
|
||||
);
|
||||
}
|
||||
return false;
|
||||
})
|
||||
.doBeforeRetry(retrySignal -> logger.debug("Retry attempt #{} for session {}",
|
||||
retrySignal.totalRetries() + 1, sessionId))
|
||||
.onRetryExhaustedThrow((retrySpec, retrySignal) -> {
|
||||
logger.error("Dialogflow CX retries exhausted for session {}", sessionId);
|
||||
return retrySignal.failure();
|
||||
})
|
||||
)
|
||||
.onErrorMap(ApiException.class, e -> {
|
||||
String statusCode = e.getStatusCode().getCode().name();
|
||||
String message = e.getMessage();
|
||||
String detailedLog = message;
|
||||
|
||||
if (e.getCause() instanceof io.grpc.StatusRuntimeException grpcEx) {
|
||||
detailedLog = String.format("Status: %s, Message: %s, Trailers: %s",
|
||||
grpcEx.getStatus().getCode(),
|
||||
grpcEx.getStatus().getDescription(),
|
||||
grpcEx.getTrailers());
|
||||
}
|
||||
|
||||
logger.error("Dialogflow CX API error for session {}: details={}",
|
||||
sessionId, detailedLog, e);
|
||||
logger.error(
|
||||
"Dialogflow CX API error for session {}: details={}",
|
||||
sessionId,
|
||||
detailedLog,
|
||||
e
|
||||
);
|
||||
|
||||
return new DialogflowClientException(
|
||||
"Dialogflow CX API error: " + statusCode + " - " + message, e);
|
||||
})
|
||||
.map(dfResponse -> this.dialogflowResponseMapper.mapFromDialogflowResponse(dfResponse, sessionId));
|
||||
return new DialogflowClientException(
|
||||
"Dialogflow CX API error: " + statusCode + " - " + message,
|
||||
e
|
||||
);
|
||||
})
|
||||
.map(dfResponse ->
|
||||
this.dialogflowResponseMapper.mapFromDialogflowResponse(
|
||||
dfResponse,
|
||||
sessionId
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,24 @@
|
||||
package com.example.service.base;
|
||||
|
||||
import com.example.dto.dialogflow.base.DetectIntentRequestDTO;
|
||||
import com.example.dto.dialogflow.base.DetectIntentResponseDTO;
|
||||
import reactor.core.publisher.Mono;
|
||||
|
||||
/**
|
||||
* Common interface for intent detection services.
|
||||
* This abstraction allows switching between different intent detection implementations
|
||||
* (e.g., Dialogflow, RAG) without changing dependent services.
|
||||
*/
|
||||
public interface IntentDetectionService {
|
||||
/**
|
||||
* Detects user intent and generates a response.
|
||||
*
|
||||
* @param sessionId The session identifier for this conversation
|
||||
* @param request The request containing user input and context parameters
|
||||
* @return A Mono of DetectIntentResponseDTO with the generated response
|
||||
*/
|
||||
Mono<DetectIntentResponseDTO> detectIntent(
|
||||
String sessionId,
|
||||
DetectIntentRequestDTO request
|
||||
);
|
||||
}
|
||||
274
src/main/java/com/example/service/base/RagClientService.java
Normal file
274
src/main/java/com/example/service/base/RagClientService.java
Normal file
@@ -0,0 +1,274 @@
|
||||
package com.example.service.base;
|
||||
|
||||
import com.example.dto.dialogflow.base.DetectIntentRequestDTO;
|
||||
import com.example.dto.dialogflow.base.DetectIntentResponseDTO;
|
||||
import com.example.dto.rag.RagQueryRequest;
|
||||
import com.example.dto.rag.RagQueryResponse;
|
||||
import com.example.exception.RagClientException;
|
||||
import com.example.mapper.rag.RagRequestMapper;
|
||||
import com.example.mapper.rag.RagResponseMapper;
|
||||
import java.time.Duration;
|
||||
import java.util.Objects;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.beans.factory.annotation.Value;
|
||||
import org.springframework.http.HttpStatusCode;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.web.reactive.function.client.WebClient;
|
||||
import org.springframework.web.reactive.function.client.WebClientRequestException;
|
||||
import org.springframework.web.reactive.function.client.WebClientResponseException;
|
||||
import reactor.core.publisher.Mono;
|
||||
import reactor.util.retry.Retry;
|
||||
|
||||
/**
|
||||
* Service for interacting with the RAG server to detect user intent and generate responses.
|
||||
* This service mirrors the structure of DialogflowClientService but calls a RAG API instead.
|
||||
* It maintains the same method signatures and reactive patterns for seamless integration.
|
||||
*/
|
||||
@Service
|
||||
@Qualifier("ragClientService")
|
||||
public class RagClientService implements IntentDetectionService {
|
||||
|
||||
private static final Logger logger = LoggerFactory.getLogger(
|
||||
RagClientService.class
|
||||
);
|
||||
|
||||
private final WebClient webClient;
|
||||
private final RagRequestMapper ragRequestMapper;
|
||||
private final RagResponseMapper ragResponseMapper;
|
||||
private final int maxRetries;
|
||||
private final Duration retryBackoff;
|
||||
private final Duration timeout;
|
||||
|
||||
public RagClientService(
|
||||
@Value("${rag.server.url}") String ragServerUrl,
|
||||
@Value("${rag.server.timeout:30s}") Duration timeout,
|
||||
@Value("${rag.server.retry.max-attempts:3}") int maxRetries,
|
||||
@Value("${rag.server.retry.backoff:1s}") Duration retryBackoff,
|
||||
@Value("${rag.server.api-key:}") String apiKey,
|
||||
RagRequestMapper ragRequestMapper,
|
||||
RagResponseMapper ragResponseMapper
|
||||
) {
|
||||
this.ragRequestMapper = ragRequestMapper;
|
||||
this.ragResponseMapper = ragResponseMapper;
|
||||
this.maxRetries = maxRetries;
|
||||
this.retryBackoff = retryBackoff;
|
||||
this.timeout = timeout;
|
||||
|
||||
// Build WebClient with base URL and optional API key
|
||||
WebClient.Builder builder = WebClient.builder()
|
||||
.baseUrl(ragServerUrl)
|
||||
.defaultHeader("Content-Type", "application/json");
|
||||
|
||||
// Add API key header if provided
|
||||
if (apiKey != null && !apiKey.isBlank()) {
|
||||
builder.defaultHeader("X-API-Key", apiKey);
|
||||
logger.info("RAG Client initialized with API key authentication");
|
||||
}
|
||||
|
||||
this.webClient = builder.build();
|
||||
|
||||
logger.info(
|
||||
"RAG Client initialized successfully with endpoint: {}",
|
||||
ragServerUrl
|
||||
);
|
||||
logger.info(
|
||||
"RAG Client configuration - timeout: {}, max retries: {}, backoff: {}",
|
||||
timeout,
|
||||
maxRetries,
|
||||
retryBackoff
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Detects user intent by calling the RAG server.
|
||||
* This method signature matches DialogflowClientService.detectIntent() for compatibility.
|
||||
*
|
||||
* @param sessionId The session identifier (used for logging, not sent to RAG)
|
||||
* @param request The DetectIntentRequestDTO containing user input and parameters
|
||||
* @return A Mono of DetectIntentResponseDTO with the RAG-generated response
|
||||
*/
|
||||
@Override
|
||||
public Mono<DetectIntentResponseDTO> detectIntent(
|
||||
String sessionId,
|
||||
DetectIntentRequestDTO request
|
||||
) {
|
||||
Objects.requireNonNull(sessionId, "Session ID cannot be null.");
|
||||
Objects.requireNonNull(request, "Request DTO cannot be null.");
|
||||
|
||||
logger.info("Initiating RAG query for session: {}", sessionId);
|
||||
|
||||
// Map DetectIntentRequestDTO to RAG format
|
||||
RagQueryRequest ragRequest;
|
||||
try {
|
||||
ragRequest = ragRequestMapper.mapToRagRequest(request, sessionId);
|
||||
logger.debug(
|
||||
"Successfully mapped request to RAG format for session: {}",
|
||||
sessionId
|
||||
);
|
||||
} catch (IllegalArgumentException e) {
|
||||
logger.error(
|
||||
"Failed to map DTO to RAG request for session {}: {}",
|
||||
sessionId,
|
||||
e.getMessage()
|
||||
);
|
||||
return Mono.error(
|
||||
new IllegalArgumentException(
|
||||
"Invalid RAG request input: " + e.getMessage()
|
||||
)
|
||||
);
|
||||
}
|
||||
|
||||
// Call RAG API
|
||||
return Mono.defer(
|
||||
() ->
|
||||
webClient
|
||||
.post()
|
||||
.uri("/api/v1/query")
|
||||
.header("X-Session-Id", sessionId) // Optional: for RAG server logging
|
||||
.bodyValue(ragRequest)
|
||||
.retrieve()
|
||||
.onStatus(HttpStatusCode::is4xxClientError, response ->
|
||||
response
|
||||
.bodyToMono(String.class)
|
||||
.flatMap(body -> {
|
||||
logger.error(
|
||||
"RAG client error for session {}: status={}, body={}",
|
||||
sessionId,
|
||||
response.statusCode(),
|
||||
body
|
||||
);
|
||||
return Mono.error(
|
||||
new RagClientException(
|
||||
"Invalid RAG request: " +
|
||||
response.statusCode() +
|
||||
" - " +
|
||||
body
|
||||
)
|
||||
);
|
||||
})
|
||||
)
|
||||
.bodyToMono(RagQueryResponse.class)
|
||||
.timeout(timeout) // Timeout per attempt
|
||||
)
|
||||
.retryWhen(
|
||||
Retry.backoff(maxRetries, retryBackoff)
|
||||
.filter(throwable -> {
|
||||
// Retry on server errors and timeouts
|
||||
if (
|
||||
throwable instanceof WebClientResponseException wce
|
||||
) {
|
||||
int statusCode = wce.getStatusCode().value();
|
||||
boolean isRetryable =
|
||||
statusCode == 500 ||
|
||||
statusCode == 503 ||
|
||||
statusCode == 504;
|
||||
if (isRetryable) {
|
||||
logger.warn(
|
||||
"Retrying RAG call for session {} due to status code: {}",
|
||||
sessionId,
|
||||
statusCode
|
||||
);
|
||||
}
|
||||
return isRetryable;
|
||||
}
|
||||
if (throwable instanceof TimeoutException) {
|
||||
logger.warn(
|
||||
"Retrying RAG call for session {} due to timeout",
|
||||
sessionId
|
||||
);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
})
|
||||
.doBeforeRetry(retrySignal ->
|
||||
logger.debug(
|
||||
"Retry attempt #{} for session {}: {}",
|
||||
retrySignal.totalRetries() + 1,
|
||||
sessionId,
|
||||
retrySignal.failure().getMessage()
|
||||
)
|
||||
)
|
||||
.onRetryExhaustedThrow((retrySpec, retrySignal) -> {
|
||||
logger.error(
|
||||
"RAG retries exhausted for session {}",
|
||||
sessionId
|
||||
);
|
||||
return retrySignal.failure();
|
||||
})
|
||||
)
|
||||
.onErrorMap(WebClientResponseException.class, e -> {
|
||||
int statusCode = e.getStatusCode().value();
|
||||
logger.error(
|
||||
"RAG server error for session {}: status={}, body={}",
|
||||
sessionId,
|
||||
statusCode,
|
||||
e.getResponseBodyAsString()
|
||||
);
|
||||
return new RagClientException(
|
||||
"RAG server error: " +
|
||||
statusCode +
|
||||
" - " +
|
||||
e.getResponseBodyAsString(),
|
||||
e
|
||||
);
|
||||
})
|
||||
.onErrorMap(WebClientRequestException.class, e -> {
|
||||
logger.error(
|
||||
"RAG connection error for session {}: {}",
|
||||
sessionId,
|
||||
e.getMessage()
|
||||
);
|
||||
return new RagClientException(
|
||||
"RAG connection failed: " + e.getMessage(),
|
||||
e
|
||||
);
|
||||
})
|
||||
.onErrorMap(TimeoutException.class, e -> {
|
||||
logger.error(
|
||||
"RAG timeout for session {}: {}",
|
||||
sessionId,
|
||||
e.getMessage()
|
||||
);
|
||||
return new RagClientException(
|
||||
"RAG request timeout after " + timeout.getSeconds() + "s",
|
||||
e
|
||||
);
|
||||
})
|
||||
.onErrorMap(RagClientException.class, e -> e) // Pass through RagClientException
|
||||
.onErrorMap(
|
||||
throwable -> !(throwable instanceof RagClientException),
|
||||
throwable -> {
|
||||
logger.error(
|
||||
"Unexpected error during RAG call for session {}: {}",
|
||||
sessionId,
|
||||
throwable.getMessage(),
|
||||
throwable
|
||||
);
|
||||
return new RagClientException(
|
||||
"Unexpected RAG error: " + throwable.getMessage(),
|
||||
throwable
|
||||
);
|
||||
}
|
||||
)
|
||||
.map(ragResponse ->
|
||||
ragResponseMapper.mapFromRagResponse(ragResponse, sessionId)
|
||||
)
|
||||
.doOnSuccess(response ->
|
||||
logger.info(
|
||||
"RAG query successful for session: {}, response ID: {}",
|
||||
sessionId,
|
||||
response.responseId()
|
||||
)
|
||||
)
|
||||
.doOnError(error ->
|
||||
logger.error(
|
||||
"RAG query failed for session {}: {}",
|
||||
sessionId,
|
||||
error.getMessage()
|
||||
)
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -14,7 +14,7 @@ import com.example.mapper.conversation.ConversationEntryMapper;
|
||||
import com.example.mapper.conversation.ExternalConvRequestMapper;
|
||||
import com.example.mapper.messagefilter.ConversationContextMapper;
|
||||
import com.example.mapper.messagefilter.NotificationContextMapper;
|
||||
import com.example.service.base.DialogflowClientService;
|
||||
import com.example.service.base.IntentDetectionService;
|
||||
import com.example.service.base.MessageEntryFilter;
|
||||
import com.example.service.base.NotificationContextResolver;
|
||||
import com.example.service.notification.MemoryStoreNotificationService;
|
||||
@@ -67,7 +67,7 @@ public class ConversationManagerService {
|
||||
private static final String CONV_HISTORY_PARAM = "conversation_history";
|
||||
private static final String HISTORY_PARAM = "historial";
|
||||
private final ExternalConvRequestMapper externalRequestToDialogflowMapper;
|
||||
private final DialogflowClientService dialogflowServiceClient;
|
||||
private final IntentDetectionService intentDetectionService;
|
||||
private final FirestoreConversationService firestoreConversationService;
|
||||
private final MemoryStoreConversationService memoryStoreConversationService;
|
||||
private final QuickRepliesManagerService quickRepliesManagerService;
|
||||
@@ -83,7 +83,7 @@ public class ConversationManagerService {
|
||||
private final ConversationEntryMapper conversationEntryMapper;
|
||||
|
||||
public ConversationManagerService(
|
||||
DialogflowClientService dialogflowServiceClient,
|
||||
IntentDetectionService intentDetectionService,
|
||||
FirestoreConversationService firestoreConversationService,
|
||||
MemoryStoreConversationService memoryStoreConversationService,
|
||||
ExternalConvRequestMapper externalRequestToDialogflowMapper,
|
||||
@@ -97,7 +97,7 @@ public class ConversationManagerService {
|
||||
LlmResponseTunerService llmResponseTunerService,
|
||||
ConversationEntryMapper conversationEntryMapper,
|
||||
@Value("${google.cloud.dlp.dlpTemplateCompleteFlow}") String dlpTemplateCompleteFlow) {
|
||||
this.dialogflowServiceClient = dialogflowServiceClient;
|
||||
this.intentDetectionService = intentDetectionService;
|
||||
this.firestoreConversationService = firestoreConversationService;
|
||||
this.memoryStoreConversationService = memoryStoreConversationService;
|
||||
this.externalRequestToDialogflowMapper = externalRequestToDialogflowMapper;
|
||||
@@ -305,7 +305,7 @@ public class ConversationManagerService {
|
||||
finalSessionId))
|
||||
.doOnError(e -> logger.error("Error during user entry persistence for session {}: {}", finalSessionId,
|
||||
e.getMessage(), e))
|
||||
.then(Mono.defer(() -> dialogflowServiceClient.detectIntent(finalSessionId, request)
|
||||
.then(Mono.defer(() -> intentDetectionService.detectIntent(finalSessionId, request)
|
||||
.flatMap(response -> {
|
||||
logger.debug(
|
||||
"RTest eceived Dialogflow CX response for session {}. Initiating agent response persistence.",
|
||||
@@ -366,7 +366,7 @@ public class ConversationManagerService {
|
||||
request.queryParams())
|
||||
.withParameter("llm_reponse_uuid", uuid);
|
||||
|
||||
return dialogflowServiceClient.detectIntent(sessionId, newRequest)
|
||||
return intentDetectionService.detectIntent(sessionId, newRequest)
|
||||
.flatMap(response -> {
|
||||
ConversationEntryDTO agentEntry = ConversationEntryDTO
|
||||
.forAgent(response.queryResult());
|
||||
@@ -387,7 +387,7 @@ public class ConversationManagerService {
|
||||
.withParameters(notification.parametros());
|
||||
}
|
||||
return persistConversationTurn(session, conversationEntryMapper.toConversationMessageDTO(userEntry))
|
||||
.then(dialogflowServiceClient.detectIntent(sessionId, finalRequest)
|
||||
.then(intentDetectionService.detectIntent(sessionId, finalRequest)
|
||||
.flatMap(response -> {
|
||||
ConversationEntryDTO agentEntry = ConversationEntryDTO
|
||||
.forAgent(response.queryResult());
|
||||
|
||||
@@ -14,7 +14,7 @@ import com.example.dto.dialogflow.conversation.ConversationSessionDTO;
|
||||
import com.example.dto.dialogflow.notification.NotificationDTO;
|
||||
import com.example.mapper.conversation.ConversationEntryMapper;
|
||||
import com.example.mapper.notification.ExternalNotRequestMapper;
|
||||
import com.example.service.base.DialogflowClientService;
|
||||
import com.example.service.base.IntentDetectionService;
|
||||
import com.example.service.conversation.DataLossPrevention;
|
||||
import com.example.service.conversation.FirestoreConversationService;
|
||||
import com.example.service.conversation.MemoryStoreConversationService;
|
||||
@@ -36,7 +36,7 @@ public class NotificationManagerService {
|
||||
private static final String eventName = "notificacion";
|
||||
private static final String PREFIX_PO_PARAM = "notification_po_";
|
||||
|
||||
private final DialogflowClientService dialogflowClientService;
|
||||
private final IntentDetectionService intentDetectionService;
|
||||
private final FirestoreNotificationService firestoreNotificationService;
|
||||
private final MemoryStoreNotificationService memoryStoreNotificationService;
|
||||
private final ExternalNotRequestMapper externalNotRequestMapper;
|
||||
@@ -50,7 +50,7 @@ public class NotificationManagerService {
|
||||
private String defaultLanguageCode;
|
||||
|
||||
public NotificationManagerService(
|
||||
DialogflowClientService dialogflowClientService,
|
||||
IntentDetectionService intentDetectionService,
|
||||
FirestoreNotificationService firestoreNotificationService,
|
||||
MemoryStoreNotificationService memoryStoreNotificationService,
|
||||
MemoryStoreConversationService memoryStoreConversationService,
|
||||
@@ -60,8 +60,8 @@ public class NotificationManagerService {
|
||||
DataLossPrevention dataLossPrevention,
|
||||
ConversationEntryMapper conversationEntryMapper,
|
||||
@Value("${google.cloud.dlp.dlpTemplateCompleteFlow}") String dlpTemplateCompleteFlow) {
|
||||
|
||||
this.dialogflowClientService = dialogflowClientService;
|
||||
|
||||
this.intentDetectionService = intentDetectionService;
|
||||
this.firestoreNotificationService = firestoreNotificationService;
|
||||
this.memoryStoreNotificationService = memoryStoreNotificationService;
|
||||
this.externalNotRequestMapper = externalNotRequestMapper;
|
||||
@@ -147,7 +147,7 @@ public class NotificationManagerService {
|
||||
|
||||
DetectIntentRequestDTO detectIntentRequest = externalNotRequestMapper.map(obfuscatedRequest);
|
||||
|
||||
return dialogflowClientService.detectIntent(sessionId, detectIntentRequest);
|
||||
return intentDetectionService.detectIntent(sessionId, detectIntentRequest);
|
||||
})
|
||||
.doOnSuccess(response -> logger
|
||||
.info("Finished processing notification. Dialogflow response received for phone {}.", telefono))
|
||||
|
||||
Reference in New Issue
Block a user