Skip to main content
Became Hot Network Question
edited title
Link
yoda
  • 277
  • 1
  • 5

API having flows difficult to reason aboutflow - normal flow along with background threadspoller(it also involves events which is not shown here)

fix final? ```
Source Link
Peilonrayz
  • 44.5k
  • 7
  • 80
  • 157
```publicpublic class OrderService {

    private final OrderRequestRepository ceaseOrderRepo;
    private final AuditService auditService;
    private final OrderProcessor ;
    private final PollPlans plans;
    private final PollNotifier pollNotifier;

    @Value("${app.case.fetch-state-async.enabled:false}")
    private boolean fetchStateAsyncEnabled;

    @Value("${cease.future.timeout-seconds:10}")
    private int futureTimeoutSeconds;
    private final OrderRequestService orderRequestService;
    @Autowired
    private ServiceConfiguration config;
    @Autowired
    private ObjectMapper mapper;

    @Autowired
    private AmaServiceConnector amaServiceConnector;

    @Autowired
    private ServiceConfig ServiceConfig;

    @Value("${ama.case.enabled:false}")
    private boolean amaCaseEnabled;

    public CeaseLineResponse createCeaseOrder(String customerId, CreateAndSubmitSuspendResumeCeaseOrderRequest req,String sc,String lo) {
        String trackingId = ServiceContext.getString(CEASE_LINE_TRACKING_ID);
        HttpHeaders httpHeaders = new HttpHeaders();
        httpHeaders.add(CEASE_LINE_TRACKING_ID,trackingId);
        httpHeaders.add(CEASE_LINE_ROUTE_ENV, ServiceContext.getString(CEASE_LINE_ROUTE_ENV));

        final CreateAndSubmitSuspendResumeCeaseOrderRequest.Payload payload = req.getPayload();
        final CreateAndSubmitSuspendResumeCeaseOrderRequest.RequestProductOfferDetail pod = payload.getRequestProductOfferDetails().get(0);
        String productOfferId = pod.getProductOfferID();
        OrderRequest entity = null;
        try {
            CeaseOrderResponse cor = .createCeaseOrder(customerId, req, sc, lo);

            String productOrderId = null;
            String productOrderActionIdx9 = null;

            if (cor != null
                && cor.getOutput() != null
                && cor.getOutput().getResponseProductOfferDetail() != null
                && !cor.getOutput().getResponseProductOfferDetail().isEmpty()
                && cor.getOutput().getResponseProductOfferDetail().get(0) != null) {
                productOrderId = cor.getOutput().getResponseProductOfferDetail().get(0).getProductOrderId();
                productOrderActionIdx9 = cor.getOutput().getResponseProductOfferDetail().get(0).getProductOrderActionIdx9();
            }

            entity = orderRequestService.persistNewOrderIfAbsent(customerId, productOfferId, payload, plans, StringUtils.isNotBlank(productOrderId) ? productOrderId : null, StringUtils.isNotBlank(productOrderActionIdx9) ? productOrderActionIdx9 : null);

            if (cor == null || cor.getOutput() == null || cor.getOutput().getResponseProductOfferDetail() == null || cor.getOutput().getResponseProductOfferDetail().isEmpty()) {
                if (null != entity) orderRequestService.markRejected(entity, "Empty response from ");
                return CeaseLineResponse.builder().httpCode(500).message("FAILED").orderNo(null).orderActionId(null).build();
            } else
                log.info("Obtained ceaseOrderResponse response from  for productOfferId: {}, trackingId: {},sc:{}", productOfferId, trackingId, sc);

            final List<CeaseOrderResponse.ResponseProductOfferDetail> productOfferDetail = cor.getOutput().getResponseProductOfferDetail();
            final CeaseOrderResponse.ResponseProductOfferDetail responseProductOfferDetail = productOfferDetail.get(0);

            final String errorCode = responseProductOfferDetail.getErrorCode();
            final String reason = responseProductOfferDetail.getReason();

            if (StringUtils.isNotBlank(errorCode) && StringUtils.isNotBlank(reason)) {
                // UC - 4
                if ("CEASED_PRODUCT".equalsIgnoreCase(errorCode) && reason.contains("Cease cannot be initiated on Ceased product")) {
                    log.info("Cease on Cease from DB. orderId={}, actionId={} trackingID={}", entity.getBillerOrderNo(), entity.getBillerOrderActionNo(), trackingId);
                    return CeaseLineResponse.builder().httpCode(200).message("SUCCESS").orderNo(entity.getBillerOrderNo()).orderActionId(entity.getBillerOrderActionNo()).messageDescription(config.getString("cease.on.cease.message.description")).build();
                }

                // UC - 2
                if ("3009".equalsIgnoreCase(errorCode) && reason.contains("Existing Pending Orders exist on the subscriber")) {
                    Matcher matcher = Pattern.compile("(?:Suspend Order Id|Order Id)\\s*:\\s*(\\d+)").matcher(reason);
                    String orderId = null;
                    if (matcher.find()) orderId = matcher.group(1);
                    log.info("OrderId obtained from existing pending order from  response. orderId={}", orderId);
                    // call AMA to create case
                    log.info("Creating case in AMA for stuck cease order. Order ID: {}, Action ID: {}, trackingId{}", entity.getBillerOrderNo(),
                        entity.getBillerOrderActionNo(), trackingId);
                    String bpmOrderId = req.getPayload().getBpmOrderInputX9() != null
                        ? req.getPayload().getBpmOrderInputX9().getBpmOrderIdx9()
                        : null;
                    String caseNumber = maybeCreateAmaCase(customerId, entity, productOfferId, bpmOrderId, httpHeaders);
                    return buildPendingOrderResponse400(orderId, reason, caseNumber);
                }
                //new uc
                if (("92000167".equalsIgnoreCase(errorCode) && reason != null && reason.contains("Pending order"))) {
                    Matcher matcher = Pattern.compile("Pending order (\\d+) found with reason (\\w+) that is not allowed to be cancelled").matcher(reason);
                    String orderId = null;
                    if (matcher.find()) orderId = matcher.group(1);
                    log.info("OrderId obtained from existing pending order from  response. orderId={}", orderId);
                    // call AMA to create case
                    log.info("Creating case in AMA for stuck cease order. Order ID: {}, Action ID: {}, trackingId{}", entity.getBillerOrderNo(),
                        entity.getBillerOrderActionNo(), trackingId);
                    String bpmOrderId = req.getPayload().getBpmOrderInputX9() != null
                        ? req.getPayload().getBpmOrderInputX9().getBpmOrderIdx9()
                        : null;
                    String caseNumber = maybeCreateAmaCase(customerId, entity, productOfferId, bpmOrderId, httpHeaders);
                    return buildPendingOrderResponse400(orderId, reason, caseNumber);
                }
                // UC - 3
                if ("CEASED_PRODUCT".equalsIgnoreCase(errorCode) && reason.contains("Pending OA ID")) {
                    Matcher matcher = Pattern.compile("Pending OA ID\\s*:\\s*(\\d+)").matcher(reason);
                    String oaId = null;
                    if (matcher.find()) oaId = matcher.group(1);
                    log.info("Pending OA ID from  response: {}", oaId);
                    log.info("Cease on Pending OA from DB. Order ID: {}, Action ID: {}, trackingId: {}", entity.getBillerOrderNo(), entity.getBillerOrderActionNo(), trackingId);
                    log.info("OrderActionId match check - : {}, DB: {}", oaId, entity.getBillerOrderActionNo());
                    return CeaseLineResponse.builder().httpCode(200).message("SUCCESS").orderNo(entity.getBillerOrderNo()).orderActionId(oaId).messageDescription(reason).build();
                }

                // US - 6, other failures
                if (("3009".equalsIgnoreCase(errorCode) && reason.contains("Invalid reason code")) || (("2000103".equalsIgnoreCase(errorCode) && reason.contains("Product with provided identifier could not be found")))) {
                    log.info("Bad request from . errorCode={}, reason={}, trackingId={}", errorCode, reason, trackingId);
                    return CeaseLineResponse.builder().httpCode(400).message("BAD-REQUEST").orderNo(null).orderActionId(null).messageDescription(reason).build();
                }

                // US - 6, other failures - with no reason code check
                if (StringUtils.isNotBlank(errorCode)) {
                    log.info("Biller error from . errorCode={}, reason={}, trackingId={}", errorCode, reason, trackingId);
                    return CeaseLineResponse.builder().httpCode(500).message("BILLER-ERROR").orderNo(null).orderActionId(null).messageDescription(reason).build();
                }
            }

            // UC - 1 & #5
            final String productOrderReferenceNumber = responseProductOfferDetail.getProductOrderReferenceNumber();
            if (StringUtils.isNotBlank(productOrderReferenceNumber) && entity != null) {
                entity.setStatus(OrderStatus.SUBMITTED);
                entity.setBillerOrderNo(responseProductOfferDetail.getProductOrderId());
                entity.setBillerOrderActionNo(responseProductOfferDetail.getProductOrderActionIdx9());
                entity.setBillerOrderCreationTime(Instant.now());
                entity.setRequestResponseTime(Instant.now());
                entity.setAttemptCount(entity.getAttemptCount());
                final OrderRequest save = ceaseOrderRepo.save(entity);
                log.info("Cease acknowledged by , productOrderReferenceNumber={}. Updated row: {}, trackingId: {}", productOrderReferenceNumber, save, trackingId);
                final OrderStatusAudit audit = auditService.audit(entity.getSeqNo(), "API_POST", " createCeaseOrder OK: order=" + responseProductOfferDetail.getProductOrderId());
                if (audit != null)
                    log.info("Cease acknowledged by , productOrderReferenceNumber={}. Audit: {}. TrackingID: {}", productOrderReferenceNumber, audit, trackingId);
            }

            if (entity != null && entity.getSeqNo() != null) {
                try {
                    log.info("Waiting for polling result for seqNo={} trackingId={}", entity.getSeqNo(), trackingId);
                    CompletableFuture<CeaseLineResponse> future = pollNotifier.register(entity.getSeqNo());
                    return future.get(futureTimeoutSeconds, TimeUnit.SECONDS);
                } catch (TimeoutException te) {
                    log.warn("Polling did not complete in time for seqNo={} trackingId{}", entity.getSeqNo(), trackingId);
                    CeaseLineResponse response = buildSuccessResponseIfOrderCompleted(customerId, productOfferId, entity.getSeqNo());
                    if (response != null) return response;

                    // call AMA to create case
                    log.info("Creating case in AMA for stuck cease order. Order ID: {}, Action ID: {}, trackingId: {}", entity.getBillerOrderNo(), entity.getBillerOrderActionNo(),
                        trackingId);
                    String bpmOrderId = req.getPayload().getBpmOrderInputX9() != null
                        ? req.getPayload().getBpmOrderInputX9().getBpmOrderIdx9()
                        : null;
                    String caseNumber = maybeCreateAmaCase(customerId, entity, productOfferId, bpmOrderId, httpHeaders);
                    return buildAcceptedOrderResponse202(entity.getBillerOrderNo(), entity.getBillerOrderActionNo(), "Order created but still processing", caseNumber);
                } catch (Exception e) {
                    log.error("Error while waiting for poll result", e);
                    CeaseLineResponse response = buildSuccessResponseIfOrderCompleted(customerId, productOfferId, entity.getSeqNo());
                    if (response != null) return response;

                    return CeaseLineResponse.builder()
                        .httpCode(500)
                        .message("FAILED")
                        .orderNo(entity.getBillerOrderNo())
                        .orderActionId(entity.getBillerOrderActionNo())
                        .build();
                }
            }
        } catch (Exception ex) {
            String errorDetailMessage = null;
            if (ex instanceof ErrorResponseFromExternalException) {
                Throwable causedBy = ((ErrorResponseFromExternalException) ex).getCausedBy();
                if (causedBy instanceof HttpServerErrorException.InternalServerError) {
                    errorDetailMessage = ((HttpServerErrorException.InternalServerError) causedBy).getResponseBodyAsString();
                }
            }
            String errorMessage = null;
            if (null != errorDetailMessage) {
                try {
                    errorMessage = extractImplErrorInfo(errorDetailMessage);
                    log.error("Failed to create cease order for productOfferId: {}, message={}, trackingId{}", productOfferId, errorMessage,
                        trackingId);
                    if (null != entity) {
                        String errorMsg = " create error: " + (errorMessage != null ? errorMessage : ex.getMessage());
                        orderRequestService.markRejected(entity, errorMsg);
                    }
                } catch (Exception e) {
                    log.error("Failed to extractImplErrorInfo, message={}, exception={} trackingId{}", e.getMessage(), e, trackingId);
                }
            }
            return CeaseLineResponse.builder().httpCode(500).message("FAILED").orderNo(null).orderActionId(null).messageDescription(errorMessage).build();
        }
        return CeaseLineResponse.builder().httpCode(500).message("FAILED").build();
    }

    public OrderStatusView getStatus(String customerId, String productOfferId) {
        return ceaseOrderRepo.findByCustomerIdAndProductOfferId(customerId, productOfferId)
            .map(orderRequest -> OrderStatusView.builder()
                .seqNo(orderRequest.getSeqNo())
                .bpmOrderId(orderRequest.getBpmOrderId())
                .lineId(orderRequest.getLineId())
                .customerId(orderRequest.getCustomerId())
                .productOfferId(orderRequest.getProductOfferId())
                .reqCreateTime(orderRequest.getReqCreateTime())
                .requestResponseTime(orderRequest.getRequestResponseTime())
                .status(orderRequest.getStatus().name())
                .billerOrderCreationTime(orderRequest.getBillerOrderCreationTime())
                .billerOrderNo(orderRequest.getBillerOrderNo())
                .billerOrderActionNo(orderRequest.getBillerOrderActionNo())
                .statusUpdateDate(orderRequest.getStatusUpdateDate())
                .statusUpdateSource(orderRequest.getStatusUpdateSource())
                .rejectReason(orderRequest.getRejectReason())
                .attemptCount(orderRequest.getAttemptCount())
                .submittedAttemptsRemaining(orderRequest.getSubmittedAttemptsRemaining())
                .completedAttemptsRemaining(orderRequest.getCompletedAttemptsRemaining())
                .nextPollAt(orderRequest.getNextPollAt())
                .lastPolledAt(orderRequest.getLastPolledAt())
                .lockUntil(orderRequest.getLockUntil())
                .lockedBy(orderRequest.getLockedBy())
                .updateContext(orderRequest.getUpdateContext())
                .trackingId(orderRequest.getTrackingId())
                .routeEnv(orderRequest.getRouteEnv())
                .build())
            .orElse(null);
    }

    private String extractImplErrorInfo(final String messageDescription) throws Exception {
        JsonNode root = mapper.readTree(messageDescription);
        JsonNode implErrorInfo = root.path("errors").get(0)
            .path("nestedError").path("ImplErrorInfo");
        return mapper.writeValueAsString(implErrorInfo);
    }

    private CeaseLineResponse buildSuccessResponseIfOrderCompleted(String customerId, String productOfferId, Long seqNo) {
        String trackingId = ServiceContext.getString(CEASE_LINE_TRACKING_ID);
        Optional<OrderRequest> orderRequest = ceaseOrderRepo.findByCustomerIdAndProductOfferId(customerId, productOfferId);
        if (orderRequest.isPresent() && orderRequest.get().getStatus().equals(OrderStatus.SUCCESS)) {
            log.warn("Returning background process status: success for seqNo={} trackingId{}", seqNo, trackingId);
            return CeaseLineResponse.builder()
                .httpCode(200)
                .message("SUCCESS")
                .orderNo(orderRequest.get().getBillerOrderNo())
                .orderActionId(orderRequest.get().getBillerOrderActionNo())
                .build();
        }
        return null;
    }

    private IopCaseRequest buildIopCaseRequest(String customerId, String billerOrderNo, String billerOrderActionNo, String productOfferId, String bpmOrderId) {
        String trackingId = ServiceContext.getString(CEASE_LINE_TRACKING_ID);
        String uDescription =String.format(
            "Voluntary Cease order has been stuck..\n\nCustomerID:%s\nProductOfferID:%s\nBPMOrderID:%s\nOrderID:%s\nOrderActionID:%s",
            customerId, productOfferId, bpmOrderId, billerOrderNo, billerOrderActionNo
        );
        log.info("AMA case description: {} trackingID{}", uDescription, trackingId);
        String correlation = String.format("%s:%s:%s", trackingId, billerOrderNo, billerOrderActionNo);

        return IopCaseRequest.builder()
            .uOpenedFor("")
            .uAssignmentGroup("")
            .uSymptoms("-")
            .uCategory("XM - Order Management")
            .uSubcategory("Stuck Order")
            .uChannel("API")
            .uShortDescription(" Voluntary Cease - Stuck Order")
            .uDescription(uDescription)
            .uPriority("High")
            .uCorrelationId(correlation)
            .uAssignedTo(ServiceConfig.getCeaseLineAssignToUser())
            .build();
    }

    private IopCaseRequest buildIopCaseRequestWithState(
        String uNumber,
        String uAssignedTo,
        String uState
    ) {
        return IopCaseRequest.builder()
            .uNumber(uNumber)
            .uAssignedTo(uAssignedTo)
            .uState(uState)
            .build();
    }

    private CaseNumberAndState createCaseAndGetCaseNumber(IopCaseRequest request1, HttpHeaders httpHeaders) {
        String trackingId = ServiceContext.getString(CEASE_LINE_TRACKING_ID);
        String caseResponse = amaServiceConnector.callAmaService(
            HttpMethod.POST,
            ServiceConfig.getAmaApiCaseUrl(),
            request1,
            httpHeaders,
            String.class
        );

        List<Map<String, Object>> responseList;
        try {
            responseList = mapper.readValue(caseResponse, new TypeReference<List<Map<String, Object>>>() {});
        } catch (JsonProcessingException e) {
            throw new RuntimeException("Failed to parse case response", e);
        }

        if (!responseList.isEmpty()) {
            Map<String, Object> firstItem = responseList.get(0);
            if (firstItem.containsKey("result")) {
                List<ResultItem> resultItems = mapper.convertValue(firstItem.get("result"), new TypeReference<List<ResultItem>>() {});
                ResultItem firstResult = resultItems.get(0);
                log.error("Result status: {}, error: {}", firstResult.getStatus(), firstResult.getError_message());
                throw new CaseCreationException(firstResult.getError_message());
            } else {
                CaseResponse caseResponseHappy = mapper.convertValue(firstItem, CaseResponse.class);
                log.info("Case number: {}, trackingId: {}", caseResponseHappy.getCase_number(),
                    trackingId);
                return new CaseNumberAndState(caseResponseHappy.getCase_number(), caseResponseHappy.getState());
            }
        }
        throw new CaseCreationException("Empty response from AMA service");
    }

    private void fetchCaseStateAsync(IopCaseRequest requestWithState, String caseNumber, HttpHeaders httpHeaders) {
        CompletableFuture.runAsync(() -> {
            try {

                CaseNumberAndState caseNumberAndState = createCaseAndGetCaseNumber(requestWithState, httpHeaders);
                if (caseNumberAndState != null) {
                    log.info("Fetched case state for fire and forget AMA in progress {}: {}. Built request with state: {}",
                        caseNumberAndState.getCaseNumber(), caseNumberAndState.getState(), requestWithState);
                }
            } catch (Exception e) {
                log.warn("Failed to fetch case state for fire and forget for in progress scenario  {}: {}", caseNumber, e.getMessage());
            }
        });
    }

    private CeaseLineResponse buildPendingOrderResponse400(String orderId, String reason, String caseNumber) {
        String trackingId = ServiceContext.getString(CEASE_LINE_TRACKING_ID);
        String messageDesc = amaCaseEnabled && caseNumber != null
            ? reason + " Case Number: " + caseNumber
            : reason;
        log.info("Returning PENDING-ORDER response 400 for orderId: {}, messageDesc: {}, trackingId{}", orderId, messageDesc,
            trackingId);
        return CeaseLineResponse.builder()
            .httpCode(400)
            .message("PENDING-ORDER")
            .orderNo(orderId)
            .orderActionId(null)
            .messageDescription(messageDesc)
            .build();
    }

    private CeaseLineResponse buildAcceptedOrderResponse202(String orderId, final String orderActionId, String reason, String caseNumber) {
        String trackingId = ServiceContext.getString(CEASE_LINE_TRACKING_ID);
        String messageDesc = amaCaseEnabled && caseNumber != null
            ? reason + " Case Number: " + caseNumber
            : reason;
        log.info("Returning ACCEPTED response 202 for orderId: {}, messageDesc: {}, trackingId{}", orderId, messageDesc,
            trackingId);
        return CeaseLineResponse.builder()
            .httpCode(202)
            .message("ACCEPTED")
            .orderNo(orderId)
            .orderActionId(orderActionId)
            .messageDescription(messageDesc)
            .build();
    }

    private String maybeCreateAmaCase(String customerId, OrderRequest entity, String productOfferId, String bpmOrderId, HttpHeaders httpHeaders) {
        String trackingId = ServiceContext.getString(CEASE_LINE_TRACKING_ID);
        if (!amaCaseEnabled) return null;
        IopCaseRequest request = buildIopCaseRequest(customerId, entity.getBillerOrderNo(), entity.getBillerOrderActionNo(), productOfferId, bpmOrderId);
        String caseNumber = createCaseAndGetCaseNumber(request, httpHeaders).getCaseNumber();
        IopCaseRequest requestWithState = buildIopCaseRequestWithState(caseNumber, ServiceConfig.getCeaseLineAssignToUser(), "15");
        log.info("Created case in AMA for stuck cease order. Order ID: {}, Action ID: {}, Case Number: {}, trackingId{}",
            entity.getBillerOrderNo(), entity.getBillerOrderActionNo(), caseNumber, trackingId);
        // Clone headers and set tracking ID explicitly
        HttpHeaders asyncHeaders = new HttpHeaders();
        asyncHeaders.putAll(httpHeaders);
        asyncHeaders.set(CEASE_LINE_TRACKING_ID, trackingId);

        // Guard the async call
        if (fetchStateAsyncEnabled) {
            fetchCaseStateAsync(requestWithState, caseNumber, asyncHeaders);
        } else {
            log.info("Async case state fetching for in progress state is disabled for case: {}", caseNumber);
        }
        return caseNumber;
    }
```public class OrderService {

    private final OrderRequestRepository ceaseOrderRepo;
    private final AuditService auditService;
    private final OrderProcessor ;
    private final PollPlans plans;
    private final PollNotifier pollNotifier;

    @Value("${app.case.fetch-state-async.enabled:false}")
    private boolean fetchStateAsyncEnabled;

    @Value("${cease.future.timeout-seconds:10}")
    private int futureTimeoutSeconds;
    private final OrderRequestService orderRequestService;
    @Autowired
    private ServiceConfiguration config;
    @Autowired
    private ObjectMapper mapper;

    @Autowired
    private AmaServiceConnector amaServiceConnector;

    @Autowired
    private ServiceConfig ServiceConfig;

    @Value("${ama.case.enabled:false}")
    private boolean amaCaseEnabled;

    public CeaseLineResponse createCeaseOrder(String customerId, CreateAndSubmitSuspendResumeCeaseOrderRequest req,String sc,String lo) {
        String trackingId = ServiceContext.getString(CEASE_LINE_TRACKING_ID);
        HttpHeaders httpHeaders = new HttpHeaders();
        httpHeaders.add(CEASE_LINE_TRACKING_ID,trackingId);
        httpHeaders.add(CEASE_LINE_ROUTE_ENV, ServiceContext.getString(CEASE_LINE_ROUTE_ENV));

        final CreateAndSubmitSuspendResumeCeaseOrderRequest.Payload payload = req.getPayload();
        final CreateAndSubmitSuspendResumeCeaseOrderRequest.RequestProductOfferDetail pod = payload.getRequestProductOfferDetails().get(0);
        String productOfferId = pod.getProductOfferID();
        OrderRequest entity = null;
        try {
            CeaseOrderResponse cor = .createCeaseOrder(customerId, req, sc, lo);

            String productOrderId = null;
            String productOrderActionIdx9 = null;

            if (cor != null
                && cor.getOutput() != null
                && cor.getOutput().getResponseProductOfferDetail() != null
                && !cor.getOutput().getResponseProductOfferDetail().isEmpty()
                && cor.getOutput().getResponseProductOfferDetail().get(0) != null) {
                productOrderId = cor.getOutput().getResponseProductOfferDetail().get(0).getProductOrderId();
                productOrderActionIdx9 = cor.getOutput().getResponseProductOfferDetail().get(0).getProductOrderActionIdx9();
            }

            entity = orderRequestService.persistNewOrderIfAbsent(customerId, productOfferId, payload, plans, StringUtils.isNotBlank(productOrderId) ? productOrderId : null, StringUtils.isNotBlank(productOrderActionIdx9) ? productOrderActionIdx9 : null);

            if (cor == null || cor.getOutput() == null || cor.getOutput().getResponseProductOfferDetail() == null || cor.getOutput().getResponseProductOfferDetail().isEmpty()) {
                if (null != entity) orderRequestService.markRejected(entity, "Empty response from ");
                return CeaseLineResponse.builder().httpCode(500).message("FAILED").orderNo(null).orderActionId(null).build();
            } else
                log.info("Obtained ceaseOrderResponse response from  for productOfferId: {}, trackingId: {},sc:{}", productOfferId, trackingId, sc);

            final List<CeaseOrderResponse.ResponseProductOfferDetail> productOfferDetail = cor.getOutput().getResponseProductOfferDetail();
            final CeaseOrderResponse.ResponseProductOfferDetail responseProductOfferDetail = productOfferDetail.get(0);

            final String errorCode = responseProductOfferDetail.getErrorCode();
            final String reason = responseProductOfferDetail.getReason();

            if (StringUtils.isNotBlank(errorCode) && StringUtils.isNotBlank(reason)) {
                // UC - 4
                if ("CEASED_PRODUCT".equalsIgnoreCase(errorCode) && reason.contains("Cease cannot be initiated on Ceased product")) {
                    log.info("Cease on Cease from DB. orderId={}, actionId={} trackingID={}", entity.getBillerOrderNo(), entity.getBillerOrderActionNo(), trackingId);
                    return CeaseLineResponse.builder().httpCode(200).message("SUCCESS").orderNo(entity.getBillerOrderNo()).orderActionId(entity.getBillerOrderActionNo()).messageDescription(config.getString("cease.on.cease.message.description")).build();
                }

                // UC - 2
                if ("3009".equalsIgnoreCase(errorCode) && reason.contains("Existing Pending Orders exist on the subscriber")) {
                    Matcher matcher = Pattern.compile("(?:Suspend Order Id|Order Id)\\s*:\\s*(\\d+)").matcher(reason);
                    String orderId = null;
                    if (matcher.find()) orderId = matcher.group(1);
                    log.info("OrderId obtained from existing pending order from  response. orderId={}", orderId);
                    // call AMA to create case
                    log.info("Creating case in AMA for stuck cease order. Order ID: {}, Action ID: {}, trackingId{}", entity.getBillerOrderNo(),
                        entity.getBillerOrderActionNo(), trackingId);
                    String bpmOrderId = req.getPayload().getBpmOrderInputX9() != null
                        ? req.getPayload().getBpmOrderInputX9().getBpmOrderIdx9()
                        : null;
                    String caseNumber = maybeCreateAmaCase(customerId, entity, productOfferId, bpmOrderId, httpHeaders);
                    return buildPendingOrderResponse400(orderId, reason, caseNumber);
                }
                //new uc
                if (("92000167".equalsIgnoreCase(errorCode) && reason != null && reason.contains("Pending order"))) {
                    Matcher matcher = Pattern.compile("Pending order (\\d+) found with reason (\\w+) that is not allowed to be cancelled").matcher(reason);
                    String orderId = null;
                    if (matcher.find()) orderId = matcher.group(1);
                    log.info("OrderId obtained from existing pending order from  response. orderId={}", orderId);
                    // call AMA to create case
                    log.info("Creating case in AMA for stuck cease order. Order ID: {}, Action ID: {}, trackingId{}", entity.getBillerOrderNo(),
                        entity.getBillerOrderActionNo(), trackingId);
                    String bpmOrderId = req.getPayload().getBpmOrderInputX9() != null
                        ? req.getPayload().getBpmOrderInputX9().getBpmOrderIdx9()
                        : null;
                    String caseNumber = maybeCreateAmaCase(customerId, entity, productOfferId, bpmOrderId, httpHeaders);
                    return buildPendingOrderResponse400(orderId, reason, caseNumber);
                }
                // UC - 3
                if ("CEASED_PRODUCT".equalsIgnoreCase(errorCode) && reason.contains("Pending OA ID")) {
                    Matcher matcher = Pattern.compile("Pending OA ID\\s*:\\s*(\\d+)").matcher(reason);
                    String oaId = null;
                    if (matcher.find()) oaId = matcher.group(1);
                    log.info("Pending OA ID from  response: {}", oaId);
                    log.info("Cease on Pending OA from DB. Order ID: {}, Action ID: {}, trackingId: {}", entity.getBillerOrderNo(), entity.getBillerOrderActionNo(), trackingId);
                    log.info("OrderActionId match check - : {}, DB: {}", oaId, entity.getBillerOrderActionNo());
                    return CeaseLineResponse.builder().httpCode(200).message("SUCCESS").orderNo(entity.getBillerOrderNo()).orderActionId(oaId).messageDescription(reason).build();
                }

                // US - 6, other failures
                if (("3009".equalsIgnoreCase(errorCode) && reason.contains("Invalid reason code")) || (("2000103".equalsIgnoreCase(errorCode) && reason.contains("Product with provided identifier could not be found")))) {
                    log.info("Bad request from . errorCode={}, reason={}, trackingId={}", errorCode, reason, trackingId);
                    return CeaseLineResponse.builder().httpCode(400).message("BAD-REQUEST").orderNo(null).orderActionId(null).messageDescription(reason).build();
                }

                // US - 6, other failures - with no reason code check
                if (StringUtils.isNotBlank(errorCode)) {
                    log.info("Biller error from . errorCode={}, reason={}, trackingId={}", errorCode, reason, trackingId);
                    return CeaseLineResponse.builder().httpCode(500).message("BILLER-ERROR").orderNo(null).orderActionId(null).messageDescription(reason).build();
                }
            }

            // UC - 1 & #5
            final String productOrderReferenceNumber = responseProductOfferDetail.getProductOrderReferenceNumber();
            if (StringUtils.isNotBlank(productOrderReferenceNumber) && entity != null) {
                entity.setStatus(OrderStatus.SUBMITTED);
                entity.setBillerOrderNo(responseProductOfferDetail.getProductOrderId());
                entity.setBillerOrderActionNo(responseProductOfferDetail.getProductOrderActionIdx9());
                entity.setBillerOrderCreationTime(Instant.now());
                entity.setRequestResponseTime(Instant.now());
                entity.setAttemptCount(entity.getAttemptCount());
                final OrderRequest save = ceaseOrderRepo.save(entity);
                log.info("Cease acknowledged by , productOrderReferenceNumber={}. Updated row: {}, trackingId: {}", productOrderReferenceNumber, save, trackingId);
                final OrderStatusAudit audit = auditService.audit(entity.getSeqNo(), "API_POST", " createCeaseOrder OK: order=" + responseProductOfferDetail.getProductOrderId());
                if (audit != null)
                    log.info("Cease acknowledged by , productOrderReferenceNumber={}. Audit: {}. TrackingID: {}", productOrderReferenceNumber, audit, trackingId);
            }

            if (entity != null && entity.getSeqNo() != null) {
                try {
                    log.info("Waiting for polling result for seqNo={} trackingId={}", entity.getSeqNo(), trackingId);
                    CompletableFuture<CeaseLineResponse> future = pollNotifier.register(entity.getSeqNo());
                    return future.get(futureTimeoutSeconds, TimeUnit.SECONDS);
                } catch (TimeoutException te) {
                    log.warn("Polling did not complete in time for seqNo={} trackingId{}", entity.getSeqNo(), trackingId);
                    CeaseLineResponse response = buildSuccessResponseIfOrderCompleted(customerId, productOfferId, entity.getSeqNo());
                    if (response != null) return response;

                    // call AMA to create case
                    log.info("Creating case in AMA for stuck cease order. Order ID: {}, Action ID: {}, trackingId: {}", entity.getBillerOrderNo(), entity.getBillerOrderActionNo(),
                        trackingId);
                    String bpmOrderId = req.getPayload().getBpmOrderInputX9() != null
                        ? req.getPayload().getBpmOrderInputX9().getBpmOrderIdx9()
                        : null;
                    String caseNumber = maybeCreateAmaCase(customerId, entity, productOfferId, bpmOrderId, httpHeaders);
                    return buildAcceptedOrderResponse202(entity.getBillerOrderNo(), entity.getBillerOrderActionNo(), "Order created but still processing", caseNumber);
                } catch (Exception e) {
                    log.error("Error while waiting for poll result", e);
                    CeaseLineResponse response = buildSuccessResponseIfOrderCompleted(customerId, productOfferId, entity.getSeqNo());
                    if (response != null) return response;

                    return CeaseLineResponse.builder()
                        .httpCode(500)
                        .message("FAILED")
                        .orderNo(entity.getBillerOrderNo())
                        .orderActionId(entity.getBillerOrderActionNo())
                        .build();
                }
            }
        } catch (Exception ex) {
            String errorDetailMessage = null;
            if (ex instanceof ErrorResponseFromExternalException) {
                Throwable causedBy = ((ErrorResponseFromExternalException) ex).getCausedBy();
                if (causedBy instanceof HttpServerErrorException.InternalServerError) {
                    errorDetailMessage = ((HttpServerErrorException.InternalServerError) causedBy).getResponseBodyAsString();
                }
            }
            String errorMessage = null;
            if (null != errorDetailMessage) {
                try {
                    errorMessage = extractImplErrorInfo(errorDetailMessage);
                    log.error("Failed to create cease order for productOfferId: {}, message={}, trackingId{}", productOfferId, errorMessage,
                        trackingId);
                    if (null != entity) {
                        String errorMsg = " create error: " + (errorMessage != null ? errorMessage : ex.getMessage());
                        orderRequestService.markRejected(entity, errorMsg);
                    }
                } catch (Exception e) {
                    log.error("Failed to extractImplErrorInfo, message={}, exception={} trackingId{}", e.getMessage(), e, trackingId);
                }
            }
            return CeaseLineResponse.builder().httpCode(500).message("FAILED").orderNo(null).orderActionId(null).messageDescription(errorMessage).build();
        }
        return CeaseLineResponse.builder().httpCode(500).message("FAILED").build();
    }

    public OrderStatusView getStatus(String customerId, String productOfferId) {
        return ceaseOrderRepo.findByCustomerIdAndProductOfferId(customerId, productOfferId)
            .map(orderRequest -> OrderStatusView.builder()
                .seqNo(orderRequest.getSeqNo())
                .bpmOrderId(orderRequest.getBpmOrderId())
                .lineId(orderRequest.getLineId())
                .customerId(orderRequest.getCustomerId())
                .productOfferId(orderRequest.getProductOfferId())
                .reqCreateTime(orderRequest.getReqCreateTime())
                .requestResponseTime(orderRequest.getRequestResponseTime())
                .status(orderRequest.getStatus().name())
                .billerOrderCreationTime(orderRequest.getBillerOrderCreationTime())
                .billerOrderNo(orderRequest.getBillerOrderNo())
                .billerOrderActionNo(orderRequest.getBillerOrderActionNo())
                .statusUpdateDate(orderRequest.getStatusUpdateDate())
                .statusUpdateSource(orderRequest.getStatusUpdateSource())
                .rejectReason(orderRequest.getRejectReason())
                .attemptCount(orderRequest.getAttemptCount())
                .submittedAttemptsRemaining(orderRequest.getSubmittedAttemptsRemaining())
                .completedAttemptsRemaining(orderRequest.getCompletedAttemptsRemaining())
                .nextPollAt(orderRequest.getNextPollAt())
                .lastPolledAt(orderRequest.getLastPolledAt())
                .lockUntil(orderRequest.getLockUntil())
                .lockedBy(orderRequest.getLockedBy())
                .updateContext(orderRequest.getUpdateContext())
                .trackingId(orderRequest.getTrackingId())
                .routeEnv(orderRequest.getRouteEnv())
                .build())
            .orElse(null);
    }

    private String extractImplErrorInfo(final String messageDescription) throws Exception {
        JsonNode root = mapper.readTree(messageDescription);
        JsonNode implErrorInfo = root.path("errors").get(0)
            .path("nestedError").path("ImplErrorInfo");
        return mapper.writeValueAsString(implErrorInfo);
    }

    private CeaseLineResponse buildSuccessResponseIfOrderCompleted(String customerId, String productOfferId, Long seqNo) {
        String trackingId = ServiceContext.getString(CEASE_LINE_TRACKING_ID);
        Optional<OrderRequest> orderRequest = ceaseOrderRepo.findByCustomerIdAndProductOfferId(customerId, productOfferId);
        if (orderRequest.isPresent() && orderRequest.get().getStatus().equals(OrderStatus.SUCCESS)) {
            log.warn("Returning background process status: success for seqNo={} trackingId{}", seqNo, trackingId);
            return CeaseLineResponse.builder()
                .httpCode(200)
                .message("SUCCESS")
                .orderNo(orderRequest.get().getBillerOrderNo())
                .orderActionId(orderRequest.get().getBillerOrderActionNo())
                .build();
        }
        return null;
    }

    private IopCaseRequest buildIopCaseRequest(String customerId, String billerOrderNo, String billerOrderActionNo, String productOfferId, String bpmOrderId) {
        String trackingId = ServiceContext.getString(CEASE_LINE_TRACKING_ID);
        String uDescription =String.format(
            "Voluntary Cease order has been stuck..\n\nCustomerID:%s\nProductOfferID:%s\nBPMOrderID:%s\nOrderID:%s\nOrderActionID:%s",
            customerId, productOfferId, bpmOrderId, billerOrderNo, billerOrderActionNo
        );
        log.info("AMA case description: {} trackingID{}", uDescription, trackingId);
        String correlation = String.format("%s:%s:%s", trackingId, billerOrderNo, billerOrderActionNo);

        return IopCaseRequest.builder()
            .uOpenedFor("")
            .uAssignmentGroup("")
            .uSymptoms("-")
            .uCategory("XM - Order Management")
            .uSubcategory("Stuck Order")
            .uChannel("API")
            .uShortDescription(" Voluntary Cease - Stuck Order")
            .uDescription(uDescription)
            .uPriority("High")
            .uCorrelationId(correlation)
            .uAssignedTo(ServiceConfig.getCeaseLineAssignToUser())
            .build();
    }

    private IopCaseRequest buildIopCaseRequestWithState(
        String uNumber,
        String uAssignedTo,
        String uState
    ) {
        return IopCaseRequest.builder()
            .uNumber(uNumber)
            .uAssignedTo(uAssignedTo)
            .uState(uState)
            .build();
    }

    private CaseNumberAndState createCaseAndGetCaseNumber(IopCaseRequest request1, HttpHeaders httpHeaders) {
        String trackingId = ServiceContext.getString(CEASE_LINE_TRACKING_ID);
        String caseResponse = amaServiceConnector.callAmaService(
            HttpMethod.POST,
            ServiceConfig.getAmaApiCaseUrl(),
            request1,
            httpHeaders,
            String.class
        );

        List<Map<String, Object>> responseList;
        try {
            responseList = mapper.readValue(caseResponse, new TypeReference<List<Map<String, Object>>>() {});
        } catch (JsonProcessingException e) {
            throw new RuntimeException("Failed to parse case response", e);
        }

        if (!responseList.isEmpty()) {
            Map<String, Object> firstItem = responseList.get(0);
            if (firstItem.containsKey("result")) {
                List<ResultItem> resultItems = mapper.convertValue(firstItem.get("result"), new TypeReference<List<ResultItem>>() {});
                ResultItem firstResult = resultItems.get(0);
                log.error("Result status: {}, error: {}", firstResult.getStatus(), firstResult.getError_message());
                throw new CaseCreationException(firstResult.getError_message());
            } else {
                CaseResponse caseResponseHappy = mapper.convertValue(firstItem, CaseResponse.class);
                log.info("Case number: {}, trackingId: {}", caseResponseHappy.getCase_number(),
                    trackingId);
                return new CaseNumberAndState(caseResponseHappy.getCase_number(), caseResponseHappy.getState());
            }
        }
        throw new CaseCreationException("Empty response from AMA service");
    }

    private void fetchCaseStateAsync(IopCaseRequest requestWithState, String caseNumber, HttpHeaders httpHeaders) {
        CompletableFuture.runAsync(() -> {
            try {

                CaseNumberAndState caseNumberAndState = createCaseAndGetCaseNumber(requestWithState, httpHeaders);
                if (caseNumberAndState != null) {
                    log.info("Fetched case state for fire and forget AMA in progress {}: {}. Built request with state: {}",
                        caseNumberAndState.getCaseNumber(), caseNumberAndState.getState(), requestWithState);
                }
            } catch (Exception e) {
                log.warn("Failed to fetch case state for fire and forget for in progress scenario  {}: {}", caseNumber, e.getMessage());
            }
        });
    }

    private CeaseLineResponse buildPendingOrderResponse400(String orderId, String reason, String caseNumber) {
        String trackingId = ServiceContext.getString(CEASE_LINE_TRACKING_ID);
        String messageDesc = amaCaseEnabled && caseNumber != null
            ? reason + " Case Number: " + caseNumber
            : reason;
        log.info("Returning PENDING-ORDER response 400 for orderId: {}, messageDesc: {}, trackingId{}", orderId, messageDesc,
            trackingId);
        return CeaseLineResponse.builder()
            .httpCode(400)
            .message("PENDING-ORDER")
            .orderNo(orderId)
            .orderActionId(null)
            .messageDescription(messageDesc)
            .build();
    }

    private CeaseLineResponse buildAcceptedOrderResponse202(String orderId, final String orderActionId, String reason, String caseNumber) {
        String trackingId = ServiceContext.getString(CEASE_LINE_TRACKING_ID);
        String messageDesc = amaCaseEnabled && caseNumber != null
            ? reason + " Case Number: " + caseNumber
            : reason;
        log.info("Returning ACCEPTED response 202 for orderId: {}, messageDesc: {}, trackingId{}", orderId, messageDesc,
            trackingId);
        return CeaseLineResponse.builder()
            .httpCode(202)
            .message("ACCEPTED")
            .orderNo(orderId)
            .orderActionId(orderActionId)
            .messageDescription(messageDesc)
            .build();
    }

    private String maybeCreateAmaCase(String customerId, OrderRequest entity, String productOfferId, String bpmOrderId, HttpHeaders httpHeaders) {
        String trackingId = ServiceContext.getString(CEASE_LINE_TRACKING_ID);
        if (!amaCaseEnabled) return null;
        IopCaseRequest request = buildIopCaseRequest(customerId, entity.getBillerOrderNo(), entity.getBillerOrderActionNo(), productOfferId, bpmOrderId);
        String caseNumber = createCaseAndGetCaseNumber(request, httpHeaders).getCaseNumber();
        IopCaseRequest requestWithState = buildIopCaseRequestWithState(caseNumber, ServiceConfig.getCeaseLineAssignToUser(), "15");
        log.info("Created case in AMA for stuck cease order. Order ID: {}, Action ID: {}, Case Number: {}, trackingId{}",
            entity.getBillerOrderNo(), entity.getBillerOrderActionNo(), caseNumber, trackingId);
        // Clone headers and set tracking ID explicitly
        HttpHeaders asyncHeaders = new HttpHeaders();
        asyncHeaders.putAll(httpHeaders);
        asyncHeaders.set(CEASE_LINE_TRACKING_ID, trackingId);

        // Guard the async call
        if (fetchStateAsyncEnabled) {
            fetchCaseStateAsync(requestWithState, caseNumber, asyncHeaders);
        } else {
            log.info("Async case state fetching for in progress state is disabled for case: {}", caseNumber);
        }
        return caseNumber;
    }
public class OrderService {

    private final OrderRequestRepository ceaseOrderRepo;
    private final AuditService auditService;
    private final OrderProcessor ;
    private final PollPlans plans;
    private final PollNotifier pollNotifier;

    @Value("${app.case.fetch-state-async.enabled:false}")
    private boolean fetchStateAsyncEnabled;

    @Value("${cease.future.timeout-seconds:10}")
    private int futureTimeoutSeconds;
    private final OrderRequestService orderRequestService;
    @Autowired
    private ServiceConfiguration config;
    @Autowired
    private ObjectMapper mapper;

    @Autowired
    private AmaServiceConnector amaServiceConnector;

    @Autowired
    private ServiceConfig ServiceConfig;

    @Value("${ama.case.enabled:false}")
    private boolean amaCaseEnabled;

    public CeaseLineResponse createCeaseOrder(String customerId, CreateAndSubmitSuspendResumeCeaseOrderRequest req,String sc,String lo) {
        String trackingId = ServiceContext.getString(CEASE_LINE_TRACKING_ID);
        HttpHeaders httpHeaders = new HttpHeaders();
        httpHeaders.add(CEASE_LINE_TRACKING_ID,trackingId);
        httpHeaders.add(CEASE_LINE_ROUTE_ENV, ServiceContext.getString(CEASE_LINE_ROUTE_ENV));

        final CreateAndSubmitSuspendResumeCeaseOrderRequest.Payload payload = req.getPayload();
        final CreateAndSubmitSuspendResumeCeaseOrderRequest.RequestProductOfferDetail pod = payload.getRequestProductOfferDetails().get(0);
        String productOfferId = pod.getProductOfferID();
        OrderRequest entity = null;
        try {
            CeaseOrderResponse cor = .createCeaseOrder(customerId, req, sc, lo);

            String productOrderId = null;
            String productOrderActionIdx9 = null;

            if (cor != null
                && cor.getOutput() != null
                && cor.getOutput().getResponseProductOfferDetail() != null
                && !cor.getOutput().getResponseProductOfferDetail().isEmpty()
                && cor.getOutput().getResponseProductOfferDetail().get(0) != null) {
                productOrderId = cor.getOutput().getResponseProductOfferDetail().get(0).getProductOrderId();
                productOrderActionIdx9 = cor.getOutput().getResponseProductOfferDetail().get(0).getProductOrderActionIdx9();
            }

            entity = orderRequestService.persistNewOrderIfAbsent(customerId, productOfferId, payload, plans, StringUtils.isNotBlank(productOrderId) ? productOrderId : null, StringUtils.isNotBlank(productOrderActionIdx9) ? productOrderActionIdx9 : null);

            if (cor == null || cor.getOutput() == null || cor.getOutput().getResponseProductOfferDetail() == null || cor.getOutput().getResponseProductOfferDetail().isEmpty()) {
                if (null != entity) orderRequestService.markRejected(entity, "Empty response from ");
                return CeaseLineResponse.builder().httpCode(500).message("FAILED").orderNo(null).orderActionId(null).build();
            } else
                log.info("Obtained ceaseOrderResponse response from  for productOfferId: {}, trackingId: {},sc:{}", productOfferId, trackingId, sc);

            final List<CeaseOrderResponse.ResponseProductOfferDetail> productOfferDetail = cor.getOutput().getResponseProductOfferDetail();
            final CeaseOrderResponse.ResponseProductOfferDetail responseProductOfferDetail = productOfferDetail.get(0);

            final String errorCode = responseProductOfferDetail.getErrorCode();
            final String reason = responseProductOfferDetail.getReason();

            if (StringUtils.isNotBlank(errorCode) && StringUtils.isNotBlank(reason)) {
                // UC - 4
                if ("CEASED_PRODUCT".equalsIgnoreCase(errorCode) && reason.contains("Cease cannot be initiated on Ceased product")) {
                    log.info("Cease on Cease from DB. orderId={}, actionId={} trackingID={}", entity.getBillerOrderNo(), entity.getBillerOrderActionNo(), trackingId);
                    return CeaseLineResponse.builder().httpCode(200).message("SUCCESS").orderNo(entity.getBillerOrderNo()).orderActionId(entity.getBillerOrderActionNo()).messageDescription(config.getString("cease.on.cease.message.description")).build();
                }

                // UC - 2
                if ("3009".equalsIgnoreCase(errorCode) && reason.contains("Existing Pending Orders exist on the subscriber")) {
                    Matcher matcher = Pattern.compile("(?:Suspend Order Id|Order Id)\\s*:\\s*(\\d+)").matcher(reason);
                    String orderId = null;
                    if (matcher.find()) orderId = matcher.group(1);
                    log.info("OrderId obtained from existing pending order from  response. orderId={}", orderId);
                    // call AMA to create case
                    log.info("Creating case in AMA for stuck cease order. Order ID: {}, Action ID: {}, trackingId{}", entity.getBillerOrderNo(),
                        entity.getBillerOrderActionNo(), trackingId);
                    String bpmOrderId = req.getPayload().getBpmOrderInputX9() != null
                        ? req.getPayload().getBpmOrderInputX9().getBpmOrderIdx9()
                        : null;
                    String caseNumber = maybeCreateAmaCase(customerId, entity, productOfferId, bpmOrderId, httpHeaders);
                    return buildPendingOrderResponse400(orderId, reason, caseNumber);
                }
                //new uc
                if (("92000167".equalsIgnoreCase(errorCode) && reason != null && reason.contains("Pending order"))) {
                    Matcher matcher = Pattern.compile("Pending order (\\d+) found with reason (\\w+) that is not allowed to be cancelled").matcher(reason);
                    String orderId = null;
                    if (matcher.find()) orderId = matcher.group(1);
                    log.info("OrderId obtained from existing pending order from  response. orderId={}", orderId);
                    // call AMA to create case
                    log.info("Creating case in AMA for stuck cease order. Order ID: {}, Action ID: {}, trackingId{}", entity.getBillerOrderNo(),
                        entity.getBillerOrderActionNo(), trackingId);
                    String bpmOrderId = req.getPayload().getBpmOrderInputX9() != null
                        ? req.getPayload().getBpmOrderInputX9().getBpmOrderIdx9()
                        : null;
                    String caseNumber = maybeCreateAmaCase(customerId, entity, productOfferId, bpmOrderId, httpHeaders);
                    return buildPendingOrderResponse400(orderId, reason, caseNumber);
                }
                // UC - 3
                if ("CEASED_PRODUCT".equalsIgnoreCase(errorCode) && reason.contains("Pending OA ID")) {
                    Matcher matcher = Pattern.compile("Pending OA ID\\s*:\\s*(\\d+)").matcher(reason);
                    String oaId = null;
                    if (matcher.find()) oaId = matcher.group(1);
                    log.info("Pending OA ID from  response: {}", oaId);
                    log.info("Cease on Pending OA from DB. Order ID: {}, Action ID: {}, trackingId: {}", entity.getBillerOrderNo(), entity.getBillerOrderActionNo(), trackingId);
                    log.info("OrderActionId match check - : {}, DB: {}", oaId, entity.getBillerOrderActionNo());
                    return CeaseLineResponse.builder().httpCode(200).message("SUCCESS").orderNo(entity.getBillerOrderNo()).orderActionId(oaId).messageDescription(reason).build();
                }

                // US - 6, other failures
                if (("3009".equalsIgnoreCase(errorCode) && reason.contains("Invalid reason code")) || (("2000103".equalsIgnoreCase(errorCode) && reason.contains("Product with provided identifier could not be found")))) {
                    log.info("Bad request from . errorCode={}, reason={}, trackingId={}", errorCode, reason, trackingId);
                    return CeaseLineResponse.builder().httpCode(400).message("BAD-REQUEST").orderNo(null).orderActionId(null).messageDescription(reason).build();
                }

                // US - 6, other failures - with no reason code check
                if (StringUtils.isNotBlank(errorCode)) {
                    log.info("Biller error from . errorCode={}, reason={}, trackingId={}", errorCode, reason, trackingId);
                    return CeaseLineResponse.builder().httpCode(500).message("BILLER-ERROR").orderNo(null).orderActionId(null).messageDescription(reason).build();
                }
            }

            // UC - 1 & #5
            final String productOrderReferenceNumber = responseProductOfferDetail.getProductOrderReferenceNumber();
            if (StringUtils.isNotBlank(productOrderReferenceNumber) && entity != null) {
                entity.setStatus(OrderStatus.SUBMITTED);
                entity.setBillerOrderNo(responseProductOfferDetail.getProductOrderId());
                entity.setBillerOrderActionNo(responseProductOfferDetail.getProductOrderActionIdx9());
                entity.setBillerOrderCreationTime(Instant.now());
                entity.setRequestResponseTime(Instant.now());
                entity.setAttemptCount(entity.getAttemptCount());
                final OrderRequest save = ceaseOrderRepo.save(entity);
                log.info("Cease acknowledged by , productOrderReferenceNumber={}. Updated row: {}, trackingId: {}", productOrderReferenceNumber, save, trackingId);
                final OrderStatusAudit audit = auditService.audit(entity.getSeqNo(), "API_POST", " createCeaseOrder OK: order=" + responseProductOfferDetail.getProductOrderId());
                if (audit != null)
                    log.info("Cease acknowledged by , productOrderReferenceNumber={}. Audit: {}. TrackingID: {}", productOrderReferenceNumber, audit, trackingId);
            }

            if (entity != null && entity.getSeqNo() != null) {
                try {
                    log.info("Waiting for polling result for seqNo={} trackingId={}", entity.getSeqNo(), trackingId);
                    CompletableFuture<CeaseLineResponse> future = pollNotifier.register(entity.getSeqNo());
                    return future.get(futureTimeoutSeconds, TimeUnit.SECONDS);
                } catch (TimeoutException te) {
                    log.warn("Polling did not complete in time for seqNo={} trackingId{}", entity.getSeqNo(), trackingId);
                    CeaseLineResponse response = buildSuccessResponseIfOrderCompleted(customerId, productOfferId, entity.getSeqNo());
                    if (response != null) return response;

                    // call AMA to create case
                    log.info("Creating case in AMA for stuck cease order. Order ID: {}, Action ID: {}, trackingId: {}", entity.getBillerOrderNo(), entity.getBillerOrderActionNo(),
                        trackingId);
                    String bpmOrderId = req.getPayload().getBpmOrderInputX9() != null
                        ? req.getPayload().getBpmOrderInputX9().getBpmOrderIdx9()
                        : null;
                    String caseNumber = maybeCreateAmaCase(customerId, entity, productOfferId, bpmOrderId, httpHeaders);
                    return buildAcceptedOrderResponse202(entity.getBillerOrderNo(), entity.getBillerOrderActionNo(), "Order created but still processing", caseNumber);
                } catch (Exception e) {
                    log.error("Error while waiting for poll result", e);
                    CeaseLineResponse response = buildSuccessResponseIfOrderCompleted(customerId, productOfferId, entity.getSeqNo());
                    if (response != null) return response;

                    return CeaseLineResponse.builder()
                        .httpCode(500)
                        .message("FAILED")
                        .orderNo(entity.getBillerOrderNo())
                        .orderActionId(entity.getBillerOrderActionNo())
                        .build();
                }
            }
        } catch (Exception ex) {
            String errorDetailMessage = null;
            if (ex instanceof ErrorResponseFromExternalException) {
                Throwable causedBy = ((ErrorResponseFromExternalException) ex).getCausedBy();
                if (causedBy instanceof HttpServerErrorException.InternalServerError) {
                    errorDetailMessage = ((HttpServerErrorException.InternalServerError) causedBy).getResponseBodyAsString();
                }
            }
            String errorMessage = null;
            if (null != errorDetailMessage) {
                try {
                    errorMessage = extractImplErrorInfo(errorDetailMessage);
                    log.error("Failed to create cease order for productOfferId: {}, message={}, trackingId{}", productOfferId, errorMessage,
                        trackingId);
                    if (null != entity) {
                        String errorMsg = " create error: " + (errorMessage != null ? errorMessage : ex.getMessage());
                        orderRequestService.markRejected(entity, errorMsg);
                    }
                } catch (Exception e) {
                    log.error("Failed to extractImplErrorInfo, message={}, exception={} trackingId{}", e.getMessage(), e, trackingId);
                }
            }
            return CeaseLineResponse.builder().httpCode(500).message("FAILED").orderNo(null).orderActionId(null).messageDescription(errorMessage).build();
        }
        return CeaseLineResponse.builder().httpCode(500).message("FAILED").build();
    }

    public OrderStatusView getStatus(String customerId, String productOfferId) {
        return ceaseOrderRepo.findByCustomerIdAndProductOfferId(customerId, productOfferId)
            .map(orderRequest -> OrderStatusView.builder()
                .seqNo(orderRequest.getSeqNo())
                .bpmOrderId(orderRequest.getBpmOrderId())
                .lineId(orderRequest.getLineId())
                .customerId(orderRequest.getCustomerId())
                .productOfferId(orderRequest.getProductOfferId())
                .reqCreateTime(orderRequest.getReqCreateTime())
                .requestResponseTime(orderRequest.getRequestResponseTime())
                .status(orderRequest.getStatus().name())
                .billerOrderCreationTime(orderRequest.getBillerOrderCreationTime())
                .billerOrderNo(orderRequest.getBillerOrderNo())
                .billerOrderActionNo(orderRequest.getBillerOrderActionNo())
                .statusUpdateDate(orderRequest.getStatusUpdateDate())
                .statusUpdateSource(orderRequest.getStatusUpdateSource())
                .rejectReason(orderRequest.getRejectReason())
                .attemptCount(orderRequest.getAttemptCount())
                .submittedAttemptsRemaining(orderRequest.getSubmittedAttemptsRemaining())
                .completedAttemptsRemaining(orderRequest.getCompletedAttemptsRemaining())
                .nextPollAt(orderRequest.getNextPollAt())
                .lastPolledAt(orderRequest.getLastPolledAt())
                .lockUntil(orderRequest.getLockUntil())
                .lockedBy(orderRequest.getLockedBy())
                .updateContext(orderRequest.getUpdateContext())
                .trackingId(orderRequest.getTrackingId())
                .routeEnv(orderRequest.getRouteEnv())
                .build())
            .orElse(null);
    }

    private String extractImplErrorInfo(final String messageDescription) throws Exception {
        JsonNode root = mapper.readTree(messageDescription);
        JsonNode implErrorInfo = root.path("errors").get(0)
            .path("nestedError").path("ImplErrorInfo");
        return mapper.writeValueAsString(implErrorInfo);
    }

    private CeaseLineResponse buildSuccessResponseIfOrderCompleted(String customerId, String productOfferId, Long seqNo) {
        String trackingId = ServiceContext.getString(CEASE_LINE_TRACKING_ID);
        Optional<OrderRequest> orderRequest = ceaseOrderRepo.findByCustomerIdAndProductOfferId(customerId, productOfferId);
        if (orderRequest.isPresent() && orderRequest.get().getStatus().equals(OrderStatus.SUCCESS)) {
            log.warn("Returning background process status: success for seqNo={} trackingId{}", seqNo, trackingId);
            return CeaseLineResponse.builder()
                .httpCode(200)
                .message("SUCCESS")
                .orderNo(orderRequest.get().getBillerOrderNo())
                .orderActionId(orderRequest.get().getBillerOrderActionNo())
                .build();
        }
        return null;
    }

    private IopCaseRequest buildIopCaseRequest(String customerId, String billerOrderNo, String billerOrderActionNo, String productOfferId, String bpmOrderId) {
        String trackingId = ServiceContext.getString(CEASE_LINE_TRACKING_ID);
        String uDescription =String.format(
            "Voluntary Cease order has been stuck..\n\nCustomerID:%s\nProductOfferID:%s\nBPMOrderID:%s\nOrderID:%s\nOrderActionID:%s",
            customerId, productOfferId, bpmOrderId, billerOrderNo, billerOrderActionNo
        );
        log.info("AMA case description: {} trackingID{}", uDescription, trackingId);
        String correlation = String.format("%s:%s:%s", trackingId, billerOrderNo, billerOrderActionNo);

        return IopCaseRequest.builder()
            .uOpenedFor("")
            .uAssignmentGroup("")
            .uSymptoms("-")
            .uCategory("XM - Order Management")
            .uSubcategory("Stuck Order")
            .uChannel("API")
            .uShortDescription(" Voluntary Cease - Stuck Order")
            .uDescription(uDescription)
            .uPriority("High")
            .uCorrelationId(correlation)
            .uAssignedTo(ServiceConfig.getCeaseLineAssignToUser())
            .build();
    }

    private IopCaseRequest buildIopCaseRequestWithState(
        String uNumber,
        String uAssignedTo,
        String uState
    ) {
        return IopCaseRequest.builder()
            .uNumber(uNumber)
            .uAssignedTo(uAssignedTo)
            .uState(uState)
            .build();
    }

    private CaseNumberAndState createCaseAndGetCaseNumber(IopCaseRequest request1, HttpHeaders httpHeaders) {
        String trackingId = ServiceContext.getString(CEASE_LINE_TRACKING_ID);
        String caseResponse = amaServiceConnector.callAmaService(
            HttpMethod.POST,
            ServiceConfig.getAmaApiCaseUrl(),
            request1,
            httpHeaders,
            String.class
        );

        List<Map<String, Object>> responseList;
        try {
            responseList = mapper.readValue(caseResponse, new TypeReference<List<Map<String, Object>>>() {});
        } catch (JsonProcessingException e) {
            throw new RuntimeException("Failed to parse case response", e);
        }

        if (!responseList.isEmpty()) {
            Map<String, Object> firstItem = responseList.get(0);
            if (firstItem.containsKey("result")) {
                List<ResultItem> resultItems = mapper.convertValue(firstItem.get("result"), new TypeReference<List<ResultItem>>() {});
                ResultItem firstResult = resultItems.get(0);
                log.error("Result status: {}, error: {}", firstResult.getStatus(), firstResult.getError_message());
                throw new CaseCreationException(firstResult.getError_message());
            } else {
                CaseResponse caseResponseHappy = mapper.convertValue(firstItem, CaseResponse.class);
                log.info("Case number: {}, trackingId: {}", caseResponseHappy.getCase_number(),
                    trackingId);
                return new CaseNumberAndState(caseResponseHappy.getCase_number(), caseResponseHappy.getState());
            }
        }
        throw new CaseCreationException("Empty response from AMA service");
    }

    private void fetchCaseStateAsync(IopCaseRequest requestWithState, String caseNumber, HttpHeaders httpHeaders) {
        CompletableFuture.runAsync(() -> {
            try {

                CaseNumberAndState caseNumberAndState = createCaseAndGetCaseNumber(requestWithState, httpHeaders);
                if (caseNumberAndState != null) {
                    log.info("Fetched case state for fire and forget AMA in progress {}: {}. Built request with state: {}",
                        caseNumberAndState.getCaseNumber(), caseNumberAndState.getState(), requestWithState);
                }
            } catch (Exception e) {
                log.warn("Failed to fetch case state for fire and forget for in progress scenario  {}: {}", caseNumber, e.getMessage());
            }
        });
    }

    private CeaseLineResponse buildPendingOrderResponse400(String orderId, String reason, String caseNumber) {
        String trackingId = ServiceContext.getString(CEASE_LINE_TRACKING_ID);
        String messageDesc = amaCaseEnabled && caseNumber != null
            ? reason + " Case Number: " + caseNumber
            : reason;
        log.info("Returning PENDING-ORDER response 400 for orderId: {}, messageDesc: {}, trackingId{}", orderId, messageDesc,
            trackingId);
        return CeaseLineResponse.builder()
            .httpCode(400)
            .message("PENDING-ORDER")
            .orderNo(orderId)
            .orderActionId(null)
            .messageDescription(messageDesc)
            .build();
    }

    private CeaseLineResponse buildAcceptedOrderResponse202(String orderId, final String orderActionId, String reason, String caseNumber) {
        String trackingId = ServiceContext.getString(CEASE_LINE_TRACKING_ID);
        String messageDesc = amaCaseEnabled && caseNumber != null
            ? reason + " Case Number: " + caseNumber
            : reason;
        log.info("Returning ACCEPTED response 202 for orderId: {}, messageDesc: {}, trackingId{}", orderId, messageDesc,
            trackingId);
        return CeaseLineResponse.builder()
            .httpCode(202)
            .message("ACCEPTED")
            .orderNo(orderId)
            .orderActionId(orderActionId)
            .messageDescription(messageDesc)
            .build();
    }

    private String maybeCreateAmaCase(String customerId, OrderRequest entity, String productOfferId, String bpmOrderId, HttpHeaders httpHeaders) {
        String trackingId = ServiceContext.getString(CEASE_LINE_TRACKING_ID);
        if (!amaCaseEnabled) return null;
        IopCaseRequest request = buildIopCaseRequest(customerId, entity.getBillerOrderNo(), entity.getBillerOrderActionNo(), productOfferId, bpmOrderId);
        String caseNumber = createCaseAndGetCaseNumber(request, httpHeaders).getCaseNumber();
        IopCaseRequest requestWithState = buildIopCaseRequestWithState(caseNumber, ServiceConfig.getCeaseLineAssignToUser(), "15");
        log.info("Created case in AMA for stuck cease order. Order ID: {}, Action ID: {}, Case Number: {}, trackingId{}",
            entity.getBillerOrderNo(), entity.getBillerOrderActionNo(), caseNumber, trackingId);
        // Clone headers and set tracking ID explicitly
        HttpHeaders asyncHeaders = new HttpHeaders();
        asyncHeaders.putAll(httpHeaders);
        asyncHeaders.set(CEASE_LINE_TRACKING_ID, trackingId);

        // Guard the async call
        if (fetchStateAsyncEnabled) {
            fetchCaseStateAsync(requestWithState, caseNumber, asyncHeaders);
        } else {
            log.info("Async case state fetching for in progress state is disabled for case: {}", caseNumber);
        }
        return caseNumber;
    }
added 3015 characters in body
Source Link
yoda
  • 277
  • 1
  • 5
@Slf4j
@RefreshScope
public class PollWorker {

enter code here
    private final PollingDao pollingDao;
    private final OrderRequestRepository ceaseOrderRepo;
    private final PollNotifier pollNotifier;
    private final PollService pollService;
    @Autowired
    private ServiceConfiguration config;

    @Value("${.worker.batch-size}")
    private int batch;

    @Value("${.worker.lock-ttl-seconds}")
    private int lockTtl;

    @Value("${.worker.instance-id}")
    private String instanceId;

    @Autowired
    @Qualifier("db.polling.executor")
    private ScheduledExecutorService executor;

    @Scheduled(fixedDelayString = "${.worker.fixed-delay-ms}")
    public void pollPendingRows() {
        if (config.getBoolean(".worker.polling.enabled", false)) {
            String now = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"));
            log.info("Polling at {} for pending rows with batch size {} and lock TTL {} seconds", now, batch, lockTtl);
            List<Long> ids = pollingDao.claimPendingRows(batch, instanceId, lockTtl);
            for (Long id : ids) {
                startPoll(id);
            }
        }
    }

    public void startPoll(Long seqNo) {
        executor.submit(() -> doPoll(seqNo, 0));
    }

    private void doPoll(Long seqNo, int planIndex) {
        try {
            log.info("Polling seqNo={} at planIndex={}", seqNo, planIndex);
            pollService.handlePoll(seqNo);

            int nextDelay = pollService.getNextDelay(seqNo, planIndex);
            if (nextDelay > 0) {
                executor.schedule(() -> doPoll(seqNo, planIndex + 1), nextDelay, TimeUnit.SECONDS);
            } else {
                OrderRequest entity = ceaseOrderRepo.findBySeqNo(seqNo).orElse(null);
                CeaseLineResponse result = CeaseLineResponse.builder()
                    .httpCode(200)
                    .message("SUCCESS")
                    .orderNo(entity != null ? entity.getBillerOrderNo() : null)
                    .orderActionId(entity != null ? entity.getBillerOrderActionNo() : null)
                    .messageDescription("Order processed successfully")
                    .build();

                pollNotifier.complete(seqNo, result);

            }
        } catch (Exception ex) {
            log.error("Polling error on seqNo {}: {}", seqNo, ex.toString());
            pollNotifier.completeExceptionally(seqNo, ex);
        }
    }
}```

**PollNotifier**

```@Component

PollNotifier

@Component
@Slf4j
public class PollNotifier {

    private final Map<Long, CompletableFuture<CeaseLineResponse>> pending = new ConcurrentHashMap<>();

    public CompletableFuture<CeaseLineResponse> register(Long seqNo) {
        CompletableFuture<CeaseLineResponse> future = new CompletableFuture<>();
        pending.put(seqNo, future);
        return future;
    }

    public void complete(Long seqNo, CeaseLineResponse response) {
        CompletableFuture<CeaseLineResponse> future = pending.remove(seqNo);
        if (future != null) {
            future.complete(response);
        } else {
            log.warn("No future found for seqNo {}", seqNo);
        }
    }

    public void completeExceptionally(Long seqNo, Throwable ex) {
        CompletableFuture<CeaseLineResponse> future = pending.remove(seqNo);
        if (future != null) {
            future.completeExceptionally(ex);
        }
    }

}```

**PollPlans**
}
```@RefreshScope

PollPlans

@RefreshScope
@Getter
public class PollPlans {
    private final int[] completedPlan;
    private final int[] submittedPlan;
    private final int initialDelaySeconds;

    public PollPlans(
        @Value("${.poll.plan.completed}") String completed,
        @Value("${.poll.plan.submitted}") String submitted,
        @Value("${.queue.initial-delay-seconds}") int initialDelaySeconds) {
        this.completedPlan = Arrays.stream(completed.split(",")).map(String::trim).mapToInt(Integer::parseInt).toArray();
        this.submittedPlan = Arrays.stream(submitted.split(",")).map(String::trim).mapToInt(Integer::parseInt).toArray();
        this.initialDelaySeconds = initialDelaySeconds;
    }
}```

**PollService**

```@RequiredArgsConstructor

PollService

@RequiredArgsConstructor
@Slf4j
public class PollService {

    private final OrderRequestRepository ceaseOrderRepo;
    private final AuditService auditService;
    private final OrderProcessor ;
    private final PollPlans plans;
    private final PollNotifier pollNotifier;
    private final PollingDao pollingDao;

    @Value("${.poll.plan.completed}") String completedPlan;
    @Value("${.poll.plan.submitted}") String submittedPlan;

    @Transactional
    public void handlePoll(Long seqNo) {
        final OrderRequest orderRequest = ceaseOrderRepo.findById(seqNo).orElseThrow();

        if (orderRequest.getStatus() == OrderStatus.SUBMITTED) {
            log.info("Inside handlePoll for seqNo={} trackingId{}", seqNo, orderRequest.getTrackingId());
            if (orderRequest.getBillerOrderNo() == null) {
                final OrderStatusAudit audit = auditService.audit(seqNo, "POLL_SKIP", "No billerOrderNo");
                if (audit != null) log.info("BillerOrderNo null from DB. Audit: {} trackingId={}", audit, orderRequest.getTrackingId());
                return;
            }
            try {
                OrderStatusResponse orderStatus = .getCeaseOrderStatus(orderRequest.getCustomerId(), orderRequest.getBillerOrderNo()
                    , orderRequest.getRouteEnv(), orderRequest.getSc(), orderRequest.getTrackingId());

                String status = orderStatus != null && orderStatus.getMessage() != null ? orderStatus.getMessage().getStatus() : null;
                String id = orderStatus != null && orderStatus.getMessage() != null ? orderStatus.getMessage().getId() : null;

                final OrderStatusAudit audit = auditService.audit(seqNo, "API_GET", " getOrderStatus OK: orderId=" + id + " status=" + status);
                if (audit != null) log.info("After getOrderStatus from . Audit: {} trackingId={}", audit, orderRequest.getTrackingId());

                if (id != null && !id.equals(orderRequest.getBillerOrderNo())) {
                    final OrderStatusAudit pollWarn = auditService.audit(seqNo, "POLL_WARN", "ID mismatch expected=" + orderRequest.getBillerOrderNo() + " got=" + id);
                    if (pollWarn != null) log.info("ID mismatch expected. Audit: {} trackingId={}", pollWarn, orderRequest.getTrackingId());
                }

                if ("CLOSED".equalsIgnoreCase(status)) {
                    orderRequest.setStatus(OrderStatus.SUCCESS);
                    orderRequest.setStatusUpdateDate(Instant.now());
                    orderRequest.setStatusUpdateSource("API");
                    orderRequest.setNextPollAt(null);
                    orderRequest.setLockUntil(null);
                    orderRequest.setLockedBy(null);
                    orderRequest.setSubmittedAttemptsRemaining(orderRequest.getSubmittedAttemptsRemaining() - 1);
                    orderRequest.setAttemptCount(orderRequest.getAttemptCount() + 1);
                    final OrderRequest save = ceaseOrderRepo.save(orderRequest);
                    log.info("Obtained CLOSED status. Updated row: {} trackingId={}", save, orderRequest.getTrackingId());
                    final OrderStatusAudit audit1 = auditService.audit(seqNo, "UPDATE", "COMPLETED via API");
                    if (audit1 != null) log.info("Obtained CLOSED status. Audit: {} trackingId={}", audit1, orderRequest.getTrackingId());

                    pollNotifier.complete(seqNo, CeaseLineResponse.builder()
                        .httpCode(200)
                        .message("SUCCESS")
                        .orderNo(orderRequest.getBillerOrderNo())
                        .orderActionId(orderRequest.getBillerOrderActionNo())
                        .build());
                } else {
                    int idx = plans.getSubmittedPlan().length - orderRequest.getSubmittedAttemptsRemaining() + 1;
                    int nextSec = (idx < plans.getSubmittedPlan().length) ? plans.getSubmittedPlan()[idx] : 0;

                    if (orderRequest.getSubmittedAttemptsRemaining() > 1) {
                        int rows = pollingDao.decrementSubmittedRemaining(seqNo, nextSec);
                        final OrderStatusAudit update = auditService.audit(seqNo, "UPDATE", "SUBMITTED still pending; rows=" + rows + "; next in " + nextSec + "s");
                        if (update != null) log.info("SUBMITTED status. Audit: {} trackingId={}", update, orderRequest.getTrackingId());
                    } else {
                        orderRequest.setNextPollAt(null);
                        orderRequest.setLockUntil(null);
                        orderRequest.setLockedBy(null);
                        orderRequest.setSubmittedAttemptsRemaining(orderRequest.getSubmittedAttemptsRemaining() - 1);
                        orderRequest.setAttemptCount(orderRequest.getAttemptCount() + 1);
                        final OrderRequest save = ceaseOrderRepo.save(orderRequest);
                        log.info("SUBMITTED (polling finished). Updated row: {} trackingId={}", save, orderRequest.getTrackingId());
                        final OrderStatusAudit update = auditService.audit(seqNo, "UPDATE", "SUBMITTED (polling finished)");
                        if (update != null) log.info("SUBMITTED (polling finished). Audit: {} trackingId={}", update, orderRequest.getTrackingId());

                        pollNotifier.complete(seqNo, CeaseLineResponse.builder()
                            .httpCode(202)
                            .message("ACCEPTED")
                            .orderNo(orderRequest.getBillerOrderNo())
                            .orderActionId(orderRequest.getBillerOrderActionNo())
                            .messageDescription("Order created but still processing")
                            .build());
                    }
                }
            } catch (Exception ex) {
                orderRequest.setStatus(OrderStatus.FAILED);
                orderRequest.setRejectReason("Poll error: " + ex.getMessage());
                orderRequest.setNextPollAt(null);
                orderRequest.setLockUntil(null);
                orderRequest.setLockedBy(null);
                final OrderRequest save = ceaseOrderRepo.save(orderRequest);
                log.info("Obtained FAILED status. Updated row: {} trackingId{}", save, orderRequest.getTrackingId());
                final OrderStatusAudit update = auditService.audit(seqNo, "UPDATE", "FAILED on poll: " + ex.getMessage());
                if (update != null) log.info("Obtained FAILED status. Audit: {} trackingId={}", update, orderRequest.getTrackingId());

                pollNotifier.complete(seqNo, CeaseLineResponse.builder()
                    .httpCode(500)
                    .message("FAILED")
                    .orderNo(orderRequest.getBillerOrderNo())
                    .orderActionId(orderRequest.getBillerOrderActionNo())
                    .messageDescription(orderRequest.getRejectReason())
                    .build());
            }
        }
    }

    public int getNextDelay(Long seqNo, int planIndex) {
        List<Integer> plan = getPlanForSeqNo(seqNo);
        if (planIndex < plan.size() - 1) {
            return plan.get(planIndex + 1);
        }
        return 0;
    }

    private List<Integer> getPlanForSeqNo(Long seqNo) {
        Optional<OrderRequest> optEntity = ceaseOrderRepo.findById(seqNo);
        if (optEntity.isEmpty()) {
            log.warn("No OrderRequest found for seqNo={}", seqNo);
            return Collections.emptyList();
        }

        OrderRequest entity = optEntity.get();

        String planStr;
        if (entity.getStatus() == OrderStatus.SUBMITTED) planStr = submittedPlan;
        else if (entity.getStatus() == OrderStatus.COMPLETED) planStr = completedPlan;
        else planStr = submittedPlan;

        return Arrays.stream(planStr.split(","))
            .map(String::trim)
            .map(Integer::parseInt)
            .collect(Collectors.toList());
    }

 
}```

**PollingDao**

```@RequiredArgsConstructor}

PollingDao

@RequiredArgsConstructor
@Slf4j
public class PollingDao {

    private final JdbcTemplate jdbc;

    @Transactional
    public List<Long> claimPendingRows(int batch, String instanceId, int lockTtlSeconds) {
        String sel = ""
            + "SELECT seq_no FROM order_requests WHERE status in ('SUBMITTED') AND next_poll_at <= CURRENT_TIMESTAMP LIMIT ? FOR UPDATE SKIP LOCKED";

        List<Long> ids = jdbc.query(sel, ps -> ps.setInt(1, batch),
            (ResultSet rs, int rowNum) -> rs.getLong(1));

        if (ids.isEmpty()) return ids;

        StringBuilder placeholders = new StringBuilder();
        for (int i=0;i<ids.size();i++){ if(i>0) placeholders.append(","); placeholders.append("?"); }

        String lockUpdate = ""
            + "UPDATE order_requests "
            + "SET lock_until = DATE_ADD(UTC_TIMESTAMP(), INTERVAL ? SECOND), locked_by = ?, last_polled_at = UTC_TIMESTAMP() "
            + "WHERE seq_no IN (" + placeholders + ")";

        jdbc.update(con -> {
            final PreparedStatement ps = con.prepareStatement(lockUpdate);
            int idx = 1;
            ps.setInt(idx++, lockTtlSeconds);
            ps.setString(idx++, instanceId);
            for (Long id : ids) ps.setLong(idx++, id);
            return ps;
        });

        log.info("Claimed rows {} by {}", ids, instanceId);
        return ids;
    }

    public int decrementSubmittedRemaining(Long seqNo, int secondsToNext) {
        String sql = "UPDATE order_requests SET submitted_attempts_remaining=submitted_attempts_remaining-1, "
            + " next_poll_at = DATE_ADD(UTC_TIMESTAMP(), INTERVAL ? SECOND), "
            + "attempt_count = attempt_count + 1 "
            + "WHERE seq_no=? AND submitted_attempts_remaining>0";
        return jdbc.update(sql, secondsToNext, seqNo);
    }
}```

**Config**

```@Bean

Config

@Bean(name = "db.polling.executor")
public ScheduledExecutorService executor() {
    ThreadFactory threadFactory = new ThreadFactory() {
        final AtomicLong count = new AtomicLong(1);

        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            thread.setName("DB-polling-Thread-" + count.getAndIncrement());
            thread.setPriority(Thread.MAX_PRIORITY);
            return thread;
        }
    };
    RejectedExecutionHandler rejectionHandler = (runnable, executor) -> LOGGER.info(
        runnable.toString() + " Subscription queue is full. Active Threads are: {} and work queue length is {} ",
        executor.getActiveCount(),
        executor.getQueue().size()
    );

    ScheduledThreadPoolExecutor scheduledExecutor = new ScheduledThreadPoolExecutor(
        Integer.parseInt(config.getString(DB_POLLING_THREAD_COREPOOL_SIZE, "20")),
        threadFactory,
        rejectionHandler
    );
    scheduledExecutor.setMaximumPoolSize(Integer.parseInt(config.getString(DB_POLLING_THREAD_MAXPOOL_SIZE, "50")));
    scheduledExecutor.setKeepAliveTime(Long.parseLong(config.getString(DB_POLLING_THREAD_TIMETOLIVE, "300000")), TimeUnit.MILLISECONDS);
    scheduledExecutor.setRemoveOnCancelPolicy(true);
    return scheduledExecutor;
}```

**OrderRequestRepository**

```Repository

OrderRequestRepository

@Repository
public interface OrderRequestRepository extends JpaRepository<OrderRequest, Long> {

    Optional<OrderRequest> findBySeqNo(Long seqNo);

    Optional<OrderRequest> findByCustomerIdAndProductOfferId(String customerId, String productOfferId);

    @Modifying
    @Query("update OrderRequest o set o.lockUntil = :lockUntil, o.lockedBy = :lockedBy where o.seqNo = :seqNo")
    int lockRow(Long seqNo, Instant lockUntil, String lockedBy);

    @Modifying
    @Query("update OrderRequest o set o.status = :status, o.statusUpdateDate = :updateDate, o.statusUpdateSource=:source where o.seqNo = :seqNo")
    int updateStatus(Long seqNo, OrderStatus status, Instant updateDate, String source);

    Optional<OrderRequest> findByBillerOrderNoAndBillerOrderActionNo(String billerOrderNo, String billerOrderActionNo);
} ```

**Controller**

```@PreAuthorize

Controller

@PreAuthorize("#oauth2.hasScope('::ceaseorder')")
@PostMapping("/cease/ceaseline")
public ResponseEntity<CeaseLineResponse> createCeaseOrder(
   
    @RequestHeader("customerId") String customerId,
    @RequestParam(value = Constants.SALES_CHANNEL, required = false, defaultValue = Constants.SALES_CHANNEL_DEFAULT) String sc,
    @RequestParam(value = Constants.QUERY_PARAM_LOCALE, required = false,defaultValue = Constants.LOCALE_DEFAULT) String lo,
    @Valid @RequestBody CreateAndSubmitSuspendResumeCeaseOrderRequest request) {
    if ((routeEnv == null || routeEnv.isEmpty())) {
        LOGGER.info("routeEnv is : {}",routeEnv);
        ServiceContext.put(CEASE_LINE_ROUTE_ENV, DefaultEnv);
    }else{
        LOGGER.info("routeEnv is : {}",routeEnv);
        ServiceContext.put(CEASE_LINE_ROUTE_ENV, routeEnv);
    }
    ServiceContext.put(CEASE_LINE_TRACKING_ID, trackingId);
    ServiceContext.put(KEY_API_NAME, "ceaseline");
    ServiceContext.put(SALES_CHANNEL, sc);
    CeaseLineResponse resp = service.createCeaseOrder(customerId, request, sc, lo);
    return ResponseEntity.status(resp.getHttpCode()).body(resp);
}```

**OrderService**

OrderService

```public class OrderService {

    private final OrderRequestRepository ceaseOrderRepo;
    private final AuditService auditService;
    private final OrderProcessor ;
    private final PollPlans plans;
    private final PollNotifier pollNotifier;

    @Value("${app.case.fetch-state-async.enabled:false}")
    private boolean fetchStateAsyncEnabled;

    @Value("${cease.future.timeout-seconds:10}")
    private int futureTimeoutSeconds;
    private final OrderRequestService orderRequestService;
    @Autowired
    private ServiceConfiguration config;
    @Autowired
    private ObjectMapper mapper;

    @Autowired
    private AmaServiceConnector amaServiceConnector;

    @Autowired
    private ServiceConfig ServiceConfig;

    @Value("${ama.case.enabled:false}")
    private boolean amaCaseEnabled;

    public CeaseLineResponse createCeaseOrder(String customerId, CreateAndSubmitSuspendResumeCeaseOrderRequest req,String sc,String lo) {
        String trackingId = ServiceContext.getString(CEASE_LINE_TRACKING_ID);
        HttpHeaders httpHeaders = new HttpHeaders();
        httpHeaders.add(CEASE_LINE_TRACKING_ID,trackingId);
        httpHeaders.add(CEASE_LINE_ROUTE_ENV, ServiceContext.getString(CEASE_LINE_ROUTE_ENV));

        final CreateAndSubmitSuspendResumeCeaseOrderRequest.Payload payload = req.getPayload();
        final CreateAndSubmitSuspendResumeCeaseOrderRequest.RequestProductOfferDetail pod = payload.getRequestProductOfferDetails().get(0);
        String productOfferId = pod.getProductOfferID();
        OrderRequest entity = null;
        try {
            CeaseOrderResponse cor = .createCeaseOrder(customerId, req, sc, lo);

            String productOrderId = null;
            String productOrderActionIdx9 = null;

            if (cor != null
                && cor.getOutput() != null
                && cor.getOutput().getResponseProductOfferDetail() != null
                && !cor.getOutput().getResponseProductOfferDetail().isEmpty()
                && cor.getOutput().getResponseProductOfferDetail().get(0) != null) {
                productOrderId = cor.getOutput().getResponseProductOfferDetail().get(0).getProductOrderId();
                productOrderActionIdx9 = cor.getOutput().getResponseProductOfferDetail().get(0).getProductOrderActionIdx9();
            }

            entity = orderRequestService.persistNewOrderIfAbsent(customerId, productOfferId, payload, plans, StringUtils.isNotBlank(productOrderId) ? productOrderId : null, StringUtils.isNotBlank(productOrderActionIdx9) ? productOrderActionIdx9 : null);

            if (cor == null || cor.getOutput() == null || cor.getOutput().getResponseProductOfferDetail() == null || cor.getOutput().getResponseProductOfferDetail().isEmpty()) {
                if (null != entity) orderRequestService.markRejected(entity, "Empty response from ");
                return CeaseLineResponse.builder().httpCode(500).message("FAILED").orderNo(null).orderActionId(null).build();
            } else
                log.info("Obtained ceaseOrderResponse response from  for productOfferId: {}, trackingId: {},sc:{}", productOfferId, trackingId, sc);

            final List<CeaseOrderResponse.ResponseProductOfferDetail> productOfferDetail = cor.getOutput().getResponseProductOfferDetail();
            final CeaseOrderResponse.ResponseProductOfferDetail responseProductOfferDetail = productOfferDetail.get(0);

            final String errorCode = responseProductOfferDetail.getErrorCode();
            final String reason = responseProductOfferDetail.getReason();

            if (StringUtils.isNotBlank(errorCode) && StringUtils.isNotBlank(reason)) {
                // UC - 4
                if ("CEASED_PRODUCT".equalsIgnoreCase(errorCode) && reason.contains("Cease cannot be initiated on Ceased product")) {
                    log.info("Cease on Cease from DB. orderId={}, actionId={} trackingID={}", entity.getBillerOrderNo(), entity.getBillerOrderActionNo(), trackingId);
                    return CeaseLineResponse.builder().httpCode(200).message("SUCCESS").orderNo(entity.getBillerOrderNo()).orderActionId(entity.getBillerOrderActionNo()).messageDescription(config.getString("cease.on.cease.message.description")).build();
                }

                // UC - 2
                if ("3009".equalsIgnoreCase(errorCode) && reason.contains("Existing Pending Orders exist on the subscriber")) {
                    Matcher matcher = Pattern.compile("(?:Suspend Order Id|Order Id)\\s*:\\s*(\\d+)").matcher(reason);
                    String orderId = null;
                    if (matcher.find()) orderId = matcher.group(1);
                    log.info("OrderId obtained from existing pending order from  response. orderId={}", orderId);
                    // call AMA to create case
                    log.info("Creating case in AMA for stuck cease order. Order ID: {}, Action ID: {}, trackingId{}", entity.getBillerOrderNo(),
                        entity.getBillerOrderActionNo(), trackingId);
                    String bpmOrderId = req.getPayload().getBpmOrderInputX9() != null
                        ? req.getPayload().getBpmOrderInputX9().getBpmOrderIdx9()
                        : null;
                    String caseNumber = maybeCreateAmaCase(customerId, entity, productOfferId, bpmOrderId, httpHeaders);
                    return buildPendingOrderResponse400(orderId, reason, caseNumber);
                }
                //new uc
                if (("92000167".equalsIgnoreCase(errorCode) && reason != null && reason.contains("Pending order"))) {
                    Matcher matcher = Pattern.compile("Pending order (\\d+) found with reason (\\w+) that is not allowed to be cancelled").matcher(reason);
                    String orderId = null;
                    if (matcher.find()) orderId = matcher.group(1);
                    log.info("OrderId obtained from existing pending order from  response. orderId={}", orderId);
                    // call AMA to create case
                    log.info("Creating case in AMA for stuck cease order. Order ID: {}, Action ID: {}, trackingId{}", entity.getBillerOrderNo(),
                        entity.getBillerOrderActionNo(), trackingId);
                    String bpmOrderId = req.getPayload().getBpmOrderInputX9() != null
                        ? req.getPayload().getBpmOrderInputX9().getBpmOrderIdx9()
                        : null;
                    String caseNumber = maybeCreateAmaCase(customerId, entity, productOfferId, bpmOrderId, httpHeaders);
                    return buildPendingOrderResponse400(orderId, reason, caseNumber);
                }
                // UC - 3
                if ("CEASED_PRODUCT".equalsIgnoreCase(errorCode) && reason.contains("Pending OA ID")) {
                    Matcher matcher = Pattern.compile("Pending OA ID\\s*:\\s*(\\d+)").matcher(reason);
                    String oaId = null;
                    if (matcher.find()) oaId = matcher.group(1);
                    log.info("Pending OA ID from  response: {}", oaId);
                    log.info("Cease on Pending OA from DB. Order ID: {}, Action ID: {}, trackingId: {}", entity.getBillerOrderNo(), entity.getBillerOrderActionNo(), trackingId);
                    log.info("OrderActionId match check - : {}, DB: {}", oaId, entity.getBillerOrderActionNo());
                    return CeaseLineResponse.builder().httpCode(200).message("SUCCESS").orderNo(entity.getBillerOrderNo()).orderActionId(oaId).messageDescription(reason).build();
                }

                // US - 6, other failures
                if (("3009".equalsIgnoreCase(errorCode) && reason.contains("Invalid reason code")) || (("2000103".equalsIgnoreCase(errorCode) && reason.contains("Product with provided identifier could not be found")))) {
                    log.info("Bad request from . errorCode={}, reason={}, trackingId={}", errorCode, reason, trackingId);
                    return CeaseLineResponse.builder().httpCode(400).message("BAD-REQUEST").orderNo(null).orderActionId(null).messageDescription(reason).build();
                }

                // US - 6, other failures - with no reason code check
                if (StringUtils.isNotBlank(errorCode)) {
                    log.info("Biller error from . errorCode={}, reason={}, trackingId={}", errorCode, reason, trackingId);
                    return CeaseLineResponse.builder().httpCode(500).message("BILLER-ERROR").orderNo(null).orderActionId(null).messageDescription(reason).build();
                }
            }

            // UC - 1 & #5
            final String productOrderReferenceNumber = responseProductOfferDetail.getProductOrderReferenceNumber();
            if (StringUtils.isNotBlank(productOrderReferenceNumber) && entity != null) {
                entity.setStatus(OrderStatus.SUBMITTED);
                entity.setBillerOrderNo(responseProductOfferDetail.getProductOrderId());
                entity.setBillerOrderActionNo(responseProductOfferDetail.getProductOrderActionIdx9());
                entity.setBillerOrderCreationTime(Instant.now());
                entity.setRequestResponseTime(Instant.now());
                entity.setAttemptCount(entity.getAttemptCount());
                final OrderRequest save = ceaseOrderRepo.save(entity);
                log.info("Cease acknowledged by , productOrderReferenceNumber={}. Updated row: {}, trackingId: {}", productOrderReferenceNumber, save, trackingId);
                final OrderStatusAudit audit = auditService.audit(entity.getSeqNo(), "API_POST", " createCeaseOrder OK: order=" + responseProductOfferDetail.getProductOrderId());
                if (audit != null)
                    log.info("Cease acknowledged by , productOrderReferenceNumber={}. Audit: {}. TrackingID: {}", productOrderReferenceNumber, audit, trackingId);
            }

            if (entity != null && entity.getSeqNo() != null) {
                try {
                    log.info("Waiting for polling result for seqNo={} trackingId={}", entity.getSeqNo(), trackingId);
                    CompletableFuture<CeaseLineResponse> future = pollNotifier.register(entity.getSeqNo());
                    return future.get(futureTimeoutSeconds, TimeUnit.SECONDS);
                } catch (TimeoutException te) {
                    log.warn("Polling did not complete in time for seqNo={} trackingId{}", entity.getSeqNo(), trackingId);
                    CeaseLineResponse response = buildSuccessResponseIfOrderCompleted(customerId, productOfferId, entity.getSeqNo());
                    if (response != null) return response;

                    // call AMA to create case
                    log.info("Creating case in AMA for stuck cease order. Order ID: {}, Action ID: {}, trackingId: {}", entity.getBillerOrderNo(), entity.getBillerOrderActionNo(),
                        trackingId);
                    String bpmOrderId = req.getPayload().getBpmOrderInputX9() != null
                        ? req.getPayload().getBpmOrderInputX9().getBpmOrderIdx9()
                        : null;
                    String caseNumber = maybeCreateAmaCase(customerId, entity, productOfferId, bpmOrderId, httpHeaders);
                    return buildAcceptedOrderResponse202(entity.getBillerOrderNo(), entity.getBillerOrderActionNo(), "Order created but still processing", caseNumber);
                } catch (Exception e) {
                    log.error("Error while waiting for poll result", e);
                    CeaseLineResponse response = buildSuccessResponseIfOrderCompleted(customerId, productOfferId, entity.getSeqNo());
                    if (response != null) return response;

                    return CeaseLineResponse.builder()
                        .httpCode(500)
                        .message("FAILED")
                        .orderNo(entity.getBillerOrderNo())
                        .orderActionId(entity.getBillerOrderActionNo())
                        .build();
                }
            }
        } catch (Exception ex) {
            String errorDetailMessage = null;
            if (ex instanceof ErrorResponseFromExternalException) {
                Throwable causedBy = ((ErrorResponseFromExternalException) ex).getCausedBy();
                if (causedBy instanceof HttpServerErrorException.InternalServerError) {
                    errorDetailMessage = ((HttpServerErrorException.InternalServerError) causedBy).getResponseBodyAsString();
                }
            }
            String errorMessage = null;
            if (null != errorDetailMessage) {
                try {
                    errorMessage = extractImplErrorInfo(errorDetailMessage);
                    log.error("Failed to create cease order for productOfferId: {}, message={}, trackingId{}", productOfferId, errorMessage,
                        trackingId);
                    if (null != entity) {
                        String errorMsg = " create error: " + (errorMessage != null ? errorMessage : ex.getMessage());
                        orderRequestService.markRejected(entity, errorMsg);
                    }
                } catch (Exception e) {
                    log.error("Failed to extractImplErrorInfo, message={}, exception={} trackingId{}", e.getMessage(), e, trackingId);
                }
            }
            return CeaseLineResponse.builder().httpCode(500).message("FAILED").orderNo(null).orderActionId(null).messageDescription(errorMessage).build();
        }
        return CeaseLineResponse.builder().httpCode(500).message("FAILED").build();
    }

    public OrderStatusView getStatus(String customerId, String productOfferId) {
        return ceaseOrderRepo.findByCustomerIdAndProductOfferId(customerId, productOfferId)
            .map(orderRequest -> OrderStatusView.builder()
                .seqNo(orderRequest.getSeqNo())
                .bpmOrderId(orderRequest.getBpmOrderId())
                .lineId(orderRequest.getLineId())
                .customerId(orderRequest.getCustomerId())
                .productOfferId(orderRequest.getProductOfferId())
                .reqCreateTime(orderRequest.getReqCreateTime())
                .requestResponseTime(orderRequest.getRequestResponseTime())
                .status(orderRequest.getStatus().name())
                .billerOrderCreationTime(orderRequest.getBillerOrderCreationTime())
                .billerOrderNo(orderRequest.getBillerOrderNo())
                .billerOrderActionNo(orderRequest.getBillerOrderActionNo())
                .statusUpdateDate(orderRequest.getStatusUpdateDate())
                .statusUpdateSource(orderRequest.getStatusUpdateSource())
                .rejectReason(orderRequest.getRejectReason())
                .attemptCount(orderRequest.getAttemptCount())
                .submittedAttemptsRemaining(orderRequest.getSubmittedAttemptsRemaining())
                .completedAttemptsRemaining(orderRequest.getCompletedAttemptsRemaining())
                .nextPollAt(orderRequest.getNextPollAt())
                .lastPolledAt(orderRequest.getLastPolledAt())
                .lockUntil(orderRequest.getLockUntil())
                .lockedBy(orderRequest.getLockedBy())
                .updateContext(orderRequest.getUpdateContext())
                .trackingId(orderRequest.getTrackingId())
                .routeEnv(orderRequest.getRouteEnv())
                .build())
            .orElse(null);
    }

    private String extractImplErrorInfo(final String messageDescription) throws Exception {
        JsonNode root = mapper.readTree(messageDescription);
        JsonNode implErrorInfo = root.path("errors").get(0)
            .path("nestedError").path("ImplErrorInfo");
        return mapper.writeValueAsString(implErrorInfo);
    }

    private CeaseLineResponse buildSuccessResponseIfOrderCompleted(String customerId, String productOfferId, Long seqNo) {
        String trackingId = ServiceContext.getString(CEASE_LINE_TRACKING_ID);
        Optional<OrderRequest> orderRequest = ceaseOrderRepo.findByCustomerIdAndProductOfferId(customerId, productOfferId);
        if (orderRequest.isPresent() && orderRequest.get().getStatus().equals(OrderStatus.SUCCESS)) {
            log.warn("Returning background process status: success for seqNo={} trackingId{}", seqNo, trackingId);
            return CeaseLineResponse.builder()
                .httpCode(200)
                .message("SUCCESS")
                .orderNo(orderRequest.get().getBillerOrderNo())
                .orderActionId(orderRequest.get().getBillerOrderActionNo())
                .build();
        }
        return null;
    }

    private IopCaseRequest buildIopCaseRequest(String customerId, String billerOrderNo, String billerOrderActionNo, String productOfferId, String bpmOrderId) {
        String trackingId = ServiceContext.getString(CEASE_LINE_TRACKING_ID);
        String uDescription =String.format(
            "Voluntary Cease order has been stuck..\n\nCustomerID:%s\nProductOfferID:%s\nBPMOrderID:%s\nOrderID:%s\nOrderActionID:%s",
            customerId, productOfferId, bpmOrderId, billerOrderNo, billerOrderActionNo
        );
        log.info("AMA case description: {} trackingID{}", uDescription, trackingId);
        String correlation = String.format("%s:%s:%s", trackingId, billerOrderNo, billerOrderActionNo);

        return IopCaseRequest.builder()
            .uOpenedFor("")
            .uAssignmentGroup("")
            .uSymptoms("-")
            .uCategory("XM - Order Management")
            .uSubcategory("Stuck Order")
            .uChannel("API")
            .uShortDescription(" Voluntary Cease - Stuck Order")
            .uDescription(uDescription)
            .uPriority("High")
            .uCorrelationId(correlation)
            .uAssignedTo(ServiceConfig.getCeaseLineAssignToUser())
            .build();
    }

    private IopCaseRequest buildIopCaseRequestWithState(
        String uNumber,
        String uAssignedTo,
        String uState
    ) {
        return IopCaseRequest.builder()
            .uNumber(uNumber)
            .uAssignedTo(uAssignedTo)
            .uState(uState)
            .build();
    }

    private CaseNumberAndState createCaseAndGetCaseNumber(IopCaseRequest request1, HttpHeaders httpHeaders) {
        String trackingId = ServiceContext.getString(CEASE_LINE_TRACKING_ID);
        String caseResponse = amaServiceConnector.callAmaService(
            HttpMethod.POST,
            ServiceConfig.getAmaApiCaseUrl(),
            request1,
            httpHeaders,
            String.class
        );

        List<Map<String, Object>> responseList;
        try {
            responseList = mapper.readValue(caseResponse, new TypeReference<List<Map<String, Object>>>() {});
        } catch (JsonProcessingException e) {
            throw new RuntimeException("Failed to parse case response", e);
        }

        if (!responseList.isEmpty()) {
            Map<String, Object> firstItem = responseList.get(0);
            if (firstItem.containsKey("result")) {
                List<ResultItem> resultItems = mapper.convertValue(firstItem.get("result"), new TypeReference<List<ResultItem>>() {});
                ResultItem firstResult = resultItems.get(0);
                log.error("Result status: {}, error: {}", firstResult.getStatus(), firstResult.getError_message());
                throw new CaseCreationException(firstResult.getError_message());
            } else {
                CaseResponse caseResponseHappy = mapper.convertValue(firstItem, CaseResponse.class);
                log.info("Case number: {}, trackingId: {}", caseResponseHappy.getCase_number(),
                    trackingId);
                return new CaseNumberAndState(caseResponseHappy.getCase_number(), caseResponseHappy.getState());
            }
        }
        throw new CaseCreationException("Empty response from AMA service");
    }

    private void fetchCaseStateAsync(IopCaseRequest requestWithState, String caseNumber, HttpHeaders httpHeaders) {
        CompletableFuture.runAsync(() -> {
            try {

                CaseNumberAndState caseNumberAndState = createCaseAndGetCaseNumber(requestWithState, httpHeaders);
                if (caseNumberAndState != null) {
                    log.info("Fetched case state for fire and forget AMA in progress {}: {}. Built request with state: {}",
                        caseNumberAndState.getCaseNumber(), caseNumberAndState.getState(), requestWithState);
                }
            } catch (Exception e) {
                log.warn("Failed to fetch case state for fire and forget for in progress scenario  {}: {}", caseNumber, e.getMessage());
            }
        });
    }

    private CeaseLineResponse buildPendingOrderResponse400(String orderId, String reason, String caseNumber) {
        String trackingId = ServiceContext.getString(CEASE_LINE_TRACKING_ID);
        String messageDesc = amaCaseEnabled && caseNumber != null
            ? reason + " Case Number: " + caseNumber
            : reason;
        log.info("Returning PENDING-ORDER response 400 for orderId: {}, messageDesc: {}, trackingId{}", orderId, messageDesc,
            trackingId);
        return CeaseLineResponse.builder()
            .httpCode(400)
            .message("PENDING-ORDER")
            .orderNo(orderId)
            .orderActionId(null)
            .messageDescription(messageDesc)
            .build();
    }

    private CeaseLineResponse buildAcceptedOrderResponse202(String orderId, final String orderActionId, String reason, String caseNumber) {
        String trackingId = ServiceContext.getString(CEASE_LINE_TRACKING_ID);
        String messageDesc = amaCaseEnabled && caseNumber != null
            ? reason + " Case Number: " + caseNumber
            : reason;
        log.info("Returning ACCEPTED response 202 for orderId: {}, messageDesc: {}, trackingId{}", orderId, messageDesc,
            trackingId);
        return CeaseLineResponse.builder()
            .httpCode(202)
            .message("ACCEPTED")
            .orderNo(orderId)
            .orderActionId(orderActionId)
            .messageDescription(messageDesc)
            .build();
    }

    private String maybeCreateAmaCase(String customerId, OrderRequest entity, String productOfferId, String bpmOrderId, HttpHeaders httpHeaders) {
        String trackingId = ServiceContext.getString(CEASE_LINE_TRACKING_ID);
        if (!amaCaseEnabled) return null;
        IopCaseRequest request = buildIopCaseRequest(customerId, entity.getBillerOrderNo(), entity.getBillerOrderActionNo(), productOfferId, bpmOrderId);
        String caseNumber = createCaseAndGetCaseNumber(request, httpHeaders).getCaseNumber();
        IopCaseRequest requestWithState = buildIopCaseRequestWithState(caseNumber, ServiceConfig.getCeaseLineAssignToUser(), "15");
        log.info("Created case in AMA for stuck cease order. Order ID: {}, Action ID: {}, Case Number: {}, trackingId{}",
            entity.getBillerOrderNo(), entity.getBillerOrderActionNo(), caseNumber, trackingId);
        // Clone headers and set tracking ID explicitly
        HttpHeaders asyncHeaders = new HttpHeaders();
        asyncHeaders.putAll(httpHeaders);
        asyncHeaders.set(CEASE_LINE_TRACKING_ID, trackingId);

        // Guard the async call
        if (fetchStateAsyncEnabled) {
            fetchCaseStateAsync(requestWithState, caseNumber, asyncHeaders);
        } else {
            log.info("Async case state fetching for in progress state is disabled for case: {}", caseNumber);
        }
        return caseNumber;
    }```
@RefreshScope
public class PollWorker {

    private final PollingDao pollingDao;
    private final OrderRequestRepository ceaseOrderRepo;
    private final PollNotifier pollNotifier;
    private final PollService pollService;
    @Autowired
    private ServiceConfiguration config;

    @Value("${.worker.batch-size}")
    private int batch;

    @Value("${.worker.lock-ttl-seconds}")
    private int lockTtl;

    @Value("${.worker.instance-id}")
    private String instanceId;

    @Autowired
    @Qualifier("db.polling.executor")
    private ScheduledExecutorService executor;

    @Scheduled(fixedDelayString = "${.worker.fixed-delay-ms}")
    public void pollPendingRows() {
        if (config.getBoolean(".worker.polling.enabled", false)) {
            String now = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"));
            log.info("Polling at {} for pending rows with batch size {} and lock TTL {} seconds", now, batch, lockTtl);
            List<Long> ids = pollingDao.claimPendingRows(batch, instanceId, lockTtl);
            for (Long id : ids) {
                startPoll(id);
            }
        }
    }

    public void startPoll(Long seqNo) {
        executor.submit(() -> doPoll(seqNo, 0));
    }

    private void doPoll(Long seqNo, int planIndex) {
        try {
            log.info("Polling seqNo={} at planIndex={}", seqNo, planIndex);
            pollService.handlePoll(seqNo);

            int nextDelay = pollService.getNextDelay(seqNo, planIndex);
            if (nextDelay > 0) {
                executor.schedule(() -> doPoll(seqNo, planIndex + 1), nextDelay, TimeUnit.SECONDS);
            } else {
                OrderRequest entity = ceaseOrderRepo.findBySeqNo(seqNo).orElse(null);
                CeaseLineResponse result = CeaseLineResponse.builder()
                    .httpCode(200)
                    .message("SUCCESS")
                    .orderNo(entity != null ? entity.getBillerOrderNo() : null)
                    .orderActionId(entity != null ? entity.getBillerOrderActionNo() : null)
                    .messageDescription("Order processed successfully")
                    .build();

                pollNotifier.complete(seqNo, result);

            }
        } catch (Exception ex) {
            log.error("Polling error on seqNo {}: {}", seqNo, ex.toString());
            pollNotifier.completeExceptionally(seqNo, ex);
        }
    }
}```

**PollNotifier**

```@Component
@Slf4j
public class PollNotifier {

    private final Map<Long, CompletableFuture<CeaseLineResponse>> pending = new ConcurrentHashMap<>();

    public CompletableFuture<CeaseLineResponse> register(Long seqNo) {
        CompletableFuture<CeaseLineResponse> future = new CompletableFuture<>();
        pending.put(seqNo, future);
        return future;
    }

    public void complete(Long seqNo, CeaseLineResponse response) {
        CompletableFuture<CeaseLineResponse> future = pending.remove(seqNo);
        if (future != null) {
            future.complete(response);
        } else {
            log.warn("No future found for seqNo {}", seqNo);
        }
    }

    public void completeExceptionally(Long seqNo, Throwable ex) {
        CompletableFuture<CeaseLineResponse> future = pending.remove(seqNo);
        if (future != null) {
            future.completeExceptionally(ex);
        }
    }

}```

**PollPlans**

```@RefreshScope
@Getter
public class PollPlans {
    private final int[] completedPlan;
    private final int[] submittedPlan;
    private final int initialDelaySeconds;

    public PollPlans(
        @Value("${.poll.plan.completed}") String completed,
        @Value("${.poll.plan.submitted}") String submitted,
        @Value("${.queue.initial-delay-seconds}") int initialDelaySeconds) {
        this.completedPlan = Arrays.stream(completed.split(",")).map(String::trim).mapToInt(Integer::parseInt).toArray();
        this.submittedPlan = Arrays.stream(submitted.split(",")).map(String::trim).mapToInt(Integer::parseInt).toArray();
        this.initialDelaySeconds = initialDelaySeconds;
    }
}```

**PollService**

```@RequiredArgsConstructor
@Slf4j
public class PollService {

    private final OrderRequestRepository ceaseOrderRepo;
    private final AuditService auditService;
    private final OrderProcessor ;
    private final PollPlans plans;
    private final PollNotifier pollNotifier;
    private final PollingDao pollingDao;

    @Value("${.poll.plan.completed}") String completedPlan;
    @Value("${.poll.plan.submitted}") String submittedPlan;

    @Transactional
    public void handlePoll(Long seqNo) {
        final OrderRequest orderRequest = ceaseOrderRepo.findById(seqNo).orElseThrow();

        if (orderRequest.getStatus() == OrderStatus.SUBMITTED) {
            log.info("Inside handlePoll for seqNo={} trackingId{}", seqNo, orderRequest.getTrackingId());
            if (orderRequest.getBillerOrderNo() == null) {
                final OrderStatusAudit audit = auditService.audit(seqNo, "POLL_SKIP", "No billerOrderNo");
                if (audit != null) log.info("BillerOrderNo null from DB. Audit: {} trackingId={}", audit, orderRequest.getTrackingId());
                return;
            }
            try {
                OrderStatusResponse orderStatus = .getCeaseOrderStatus(orderRequest.getCustomerId(), orderRequest.getBillerOrderNo()
                    , orderRequest.getRouteEnv(), orderRequest.getSc(), orderRequest.getTrackingId());

                String status = orderStatus != null && orderStatus.getMessage() != null ? orderStatus.getMessage().getStatus() : null;
                String id = orderStatus != null && orderStatus.getMessage() != null ? orderStatus.getMessage().getId() : null;

                final OrderStatusAudit audit = auditService.audit(seqNo, "API_GET", " getOrderStatus OK: orderId=" + id + " status=" + status);
                if (audit != null) log.info("After getOrderStatus from . Audit: {} trackingId={}", audit, orderRequest.getTrackingId());

                if (id != null && !id.equals(orderRequest.getBillerOrderNo())) {
                    final OrderStatusAudit pollWarn = auditService.audit(seqNo, "POLL_WARN", "ID mismatch expected=" + orderRequest.getBillerOrderNo() + " got=" + id);
                    if (pollWarn != null) log.info("ID mismatch expected. Audit: {} trackingId={}", pollWarn, orderRequest.getTrackingId());
                }

                if ("CLOSED".equalsIgnoreCase(status)) {
                    orderRequest.setStatus(OrderStatus.SUCCESS);
                    orderRequest.setStatusUpdateDate(Instant.now());
                    orderRequest.setStatusUpdateSource("API");
                    orderRequest.setNextPollAt(null);
                    orderRequest.setLockUntil(null);
                    orderRequest.setLockedBy(null);
                    orderRequest.setSubmittedAttemptsRemaining(orderRequest.getSubmittedAttemptsRemaining() - 1);
                    orderRequest.setAttemptCount(orderRequest.getAttemptCount() + 1);
                    final OrderRequest save = ceaseOrderRepo.save(orderRequest);
                    log.info("Obtained CLOSED status. Updated row: {} trackingId={}", save, orderRequest.getTrackingId());
                    final OrderStatusAudit audit1 = auditService.audit(seqNo, "UPDATE", "COMPLETED via API");
                    if (audit1 != null) log.info("Obtained CLOSED status. Audit: {} trackingId={}", audit1, orderRequest.getTrackingId());

                    pollNotifier.complete(seqNo, CeaseLineResponse.builder()
                        .httpCode(200)
                        .message("SUCCESS")
                        .orderNo(orderRequest.getBillerOrderNo())
                        .orderActionId(orderRequest.getBillerOrderActionNo())
                        .build());
                } else {
                    int idx = plans.getSubmittedPlan().length - orderRequest.getSubmittedAttemptsRemaining() + 1;
                    int nextSec = (idx < plans.getSubmittedPlan().length) ? plans.getSubmittedPlan()[idx] : 0;

                    if (orderRequest.getSubmittedAttemptsRemaining() > 1) {
                        int rows = pollingDao.decrementSubmittedRemaining(seqNo, nextSec);
                        final OrderStatusAudit update = auditService.audit(seqNo, "UPDATE", "SUBMITTED still pending; rows=" + rows + "; next in " + nextSec + "s");
                        if (update != null) log.info("SUBMITTED status. Audit: {} trackingId={}", update, orderRequest.getTrackingId());
                    } else {
                        orderRequest.setNextPollAt(null);
                        orderRequest.setLockUntil(null);
                        orderRequest.setLockedBy(null);
                        orderRequest.setSubmittedAttemptsRemaining(orderRequest.getSubmittedAttemptsRemaining() - 1);
                        orderRequest.setAttemptCount(orderRequest.getAttemptCount() + 1);
                        final OrderRequest save = ceaseOrderRepo.save(orderRequest);
                        log.info("SUBMITTED (polling finished). Updated row: {} trackingId={}", save, orderRequest.getTrackingId());
                        final OrderStatusAudit update = auditService.audit(seqNo, "UPDATE", "SUBMITTED (polling finished)");
                        if (update != null) log.info("SUBMITTED (polling finished). Audit: {} trackingId={}", update, orderRequest.getTrackingId());

                        pollNotifier.complete(seqNo, CeaseLineResponse.builder()
                            .httpCode(202)
                            .message("ACCEPTED")
                            .orderNo(orderRequest.getBillerOrderNo())
                            .orderActionId(orderRequest.getBillerOrderActionNo())
                            .messageDescription("Order created but still processing")
                            .build());
                    }
                }
            } catch (Exception ex) {
                orderRequest.setStatus(OrderStatus.FAILED);
                orderRequest.setRejectReason("Poll error: " + ex.getMessage());
                orderRequest.setNextPollAt(null);
                orderRequest.setLockUntil(null);
                orderRequest.setLockedBy(null);
                final OrderRequest save = ceaseOrderRepo.save(orderRequest);
                log.info("Obtained FAILED status. Updated row: {} trackingId{}", save, orderRequest.getTrackingId());
                final OrderStatusAudit update = auditService.audit(seqNo, "UPDATE", "FAILED on poll: " + ex.getMessage());
                if (update != null) log.info("Obtained FAILED status. Audit: {} trackingId={}", update, orderRequest.getTrackingId());

                pollNotifier.complete(seqNo, CeaseLineResponse.builder()
                    .httpCode(500)
                    .message("FAILED")
                    .orderNo(orderRequest.getBillerOrderNo())
                    .orderActionId(orderRequest.getBillerOrderActionNo())
                    .messageDescription(orderRequest.getRejectReason())
                    .build());
            }
        }
    }

    public int getNextDelay(Long seqNo, int planIndex) {
        List<Integer> plan = getPlanForSeqNo(seqNo);
        if (planIndex < plan.size() - 1) {
            return plan.get(planIndex + 1);
        }
        return 0;
    }

    private List<Integer> getPlanForSeqNo(Long seqNo) {
        Optional<OrderRequest> optEntity = ceaseOrderRepo.findById(seqNo);
        if (optEntity.isEmpty()) {
            log.warn("No OrderRequest found for seqNo={}", seqNo);
            return Collections.emptyList();
        }

        OrderRequest entity = optEntity.get();

        String planStr;
        if (entity.getStatus() == OrderStatus.SUBMITTED) planStr = submittedPlan;
        else if (entity.getStatus() == OrderStatus.COMPLETED) planStr = completedPlan;
        else planStr = submittedPlan;

        return Arrays.stream(planStr.split(","))
            .map(String::trim)
            .map(Integer::parseInt)
            .collect(Collectors.toList());
    }

 
}```

**PollingDao**

```@RequiredArgsConstructor
@Slf4j
public class PollingDao {

    private final JdbcTemplate jdbc;

    @Transactional
    public List<Long> claimPendingRows(int batch, String instanceId, int lockTtlSeconds) {
        String sel = ""
            + "SELECT seq_no FROM order_requests WHERE status in ('SUBMITTED') AND next_poll_at <= CURRENT_TIMESTAMP LIMIT ? FOR UPDATE SKIP LOCKED";

        List<Long> ids = jdbc.query(sel, ps -> ps.setInt(1, batch),
            (ResultSet rs, int rowNum) -> rs.getLong(1));

        if (ids.isEmpty()) return ids;

        StringBuilder placeholders = new StringBuilder();
        for (int i=0;i<ids.size();i++){ if(i>0) placeholders.append(","); placeholders.append("?"); }

        String lockUpdate = ""
            + "UPDATE order_requests "
            + "SET lock_until = DATE_ADD(UTC_TIMESTAMP(), INTERVAL ? SECOND), locked_by = ?, last_polled_at = UTC_TIMESTAMP() "
            + "WHERE seq_no IN (" + placeholders + ")";

        jdbc.update(con -> {
            final PreparedStatement ps = con.prepareStatement(lockUpdate);
            int idx = 1;
            ps.setInt(idx++, lockTtlSeconds);
            ps.setString(idx++, instanceId);
            for (Long id : ids) ps.setLong(idx++, id);
            return ps;
        });

        log.info("Claimed rows {} by {}", ids, instanceId);
        return ids;
    }

    public int decrementSubmittedRemaining(Long seqNo, int secondsToNext) {
        String sql = "UPDATE order_requests SET submitted_attempts_remaining=submitted_attempts_remaining-1, "
            + " next_poll_at = DATE_ADD(UTC_TIMESTAMP(), INTERVAL ? SECOND), "
            + "attempt_count = attempt_count + 1 "
            + "WHERE seq_no=? AND submitted_attempts_remaining>0";
        return jdbc.update(sql, secondsToNext, seqNo);
    }
}```

**Config**

```@Bean(name = "db.polling.executor")
public ScheduledExecutorService executor() {
    ThreadFactory threadFactory = new ThreadFactory() {
        final AtomicLong count = new AtomicLong(1);

        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            thread.setName("DB-polling-Thread-" + count.getAndIncrement());
            thread.setPriority(Thread.MAX_PRIORITY);
            return thread;
        }
    };
    RejectedExecutionHandler rejectionHandler = (runnable, executor) -> LOGGER.info(
        runnable.toString() + " Subscription queue is full. Active Threads are: {} and work queue length is {} ",
        executor.getActiveCount(),
        executor.getQueue().size()
    );

    ScheduledThreadPoolExecutor scheduledExecutor = new ScheduledThreadPoolExecutor(
        Integer.parseInt(config.getString(DB_POLLING_THREAD_COREPOOL_SIZE, "20")),
        threadFactory,
        rejectionHandler
    );
    scheduledExecutor.setMaximumPoolSize(Integer.parseInt(config.getString(DB_POLLING_THREAD_MAXPOOL_SIZE, "50")));
    scheduledExecutor.setKeepAliveTime(Long.parseLong(config.getString(DB_POLLING_THREAD_TIMETOLIVE, "300000")), TimeUnit.MILLISECONDS);
    scheduledExecutor.setRemoveOnCancelPolicy(true);
    return scheduledExecutor;
}```

**OrderRequestRepository**

```Repository
public interface OrderRequestRepository extends JpaRepository<OrderRequest, Long> {

    Optional<OrderRequest> findBySeqNo(Long seqNo);

    Optional<OrderRequest> findByCustomerIdAndProductOfferId(String customerId, String productOfferId);

    @Modifying
    @Query("update OrderRequest o set o.lockUntil = :lockUntil, o.lockedBy = :lockedBy where o.seqNo = :seqNo")
    int lockRow(Long seqNo, Instant lockUntil, String lockedBy);

    @Modifying
    @Query("update OrderRequest o set o.status = :status, o.statusUpdateDate = :updateDate, o.statusUpdateSource=:source where o.seqNo = :seqNo")
    int updateStatus(Long seqNo, OrderStatus status, Instant updateDate, String source);

    Optional<OrderRequest> findByBillerOrderNoAndBillerOrderActionNo(String billerOrderNo, String billerOrderActionNo);
} ```

**Controller**

```@PreAuthorize("#oauth2.hasScope('::ceaseorder')")
@PostMapping("/cease/ceaseline")
public ResponseEntity<CeaseLineResponse> createCeaseOrder(
   
    @RequestHeader("customerId") String customerId,
    @RequestParam(value = Constants.SALES_CHANNEL, required = false, defaultValue = Constants.SALES_CHANNEL_DEFAULT) String sc,
    @RequestParam(value = Constants.QUERY_PARAM_LOCALE, required = false,defaultValue = Constants.LOCALE_DEFAULT) String lo,
    @Valid @RequestBody CreateAndSubmitSuspendResumeCeaseOrderRequest request) {
    if ((routeEnv == null || routeEnv.isEmpty())) {
        LOGGER.info("routeEnv is : {}",routeEnv);
        ServiceContext.put(CEASE_LINE_ROUTE_ENV, DefaultEnv);
    }else{
        LOGGER.info("routeEnv is : {}",routeEnv);
        ServiceContext.put(CEASE_LINE_ROUTE_ENV, routeEnv);
    }
    ServiceContext.put(CEASE_LINE_TRACKING_ID, trackingId);
    ServiceContext.put(KEY_API_NAME, "ceaseline");
    ServiceContext.put(SALES_CHANNEL, sc);
    CeaseLineResponse resp = service.createCeaseOrder(customerId, request, sc, lo);
    return ResponseEntity.status(resp.getHttpCode()).body(resp);
}```

**OrderService**

```public class OrderService {

    private final OrderRequestRepository ceaseOrderRepo;
    private final AuditService auditService;
    private final OrderProcessor ;
    private final PollPlans plans;
    private final PollNotifier pollNotifier;

    @Value("${app.case.fetch-state-async.enabled:false}")
    private boolean fetchStateAsyncEnabled;

    @Value("${cease.future.timeout-seconds:10}")
    private int futureTimeoutSeconds;
    private final OrderRequestService orderRequestService;
    @Autowired
    private ServiceConfiguration config;
    @Autowired
    private ObjectMapper mapper;

    @Autowired
    private AmaServiceConnector amaServiceConnector;

    @Autowired
    private ServiceConfig ServiceConfig;

    @Value("${ama.case.enabled:false}")
    private boolean amaCaseEnabled;

    public CeaseLineResponse createCeaseOrder(String customerId, CreateAndSubmitSuspendResumeCeaseOrderRequest req,String sc,String lo) {
        String trackingId = ServiceContext.getString(CEASE_LINE_TRACKING_ID);
        HttpHeaders httpHeaders = new HttpHeaders();
        httpHeaders.add(CEASE_LINE_TRACKING_ID,trackingId);
        httpHeaders.add(CEASE_LINE_ROUTE_ENV, ServiceContext.getString(CEASE_LINE_ROUTE_ENV));

        final CreateAndSubmitSuspendResumeCeaseOrderRequest.Payload payload = req.getPayload();
        final CreateAndSubmitSuspendResumeCeaseOrderRequest.RequestProductOfferDetail pod = payload.getRequestProductOfferDetails().get(0);
        String productOfferId = pod.getProductOfferID();
        OrderRequest entity = null;
        try {
            CeaseOrderResponse cor = .createCeaseOrder(customerId, req, sc, lo);

            String productOrderId = null;
            String productOrderActionIdx9 = null;

            if (cor != null
                && cor.getOutput() != null
                && cor.getOutput().getResponseProductOfferDetail() != null
                && !cor.getOutput().getResponseProductOfferDetail().isEmpty()
                && cor.getOutput().getResponseProductOfferDetail().get(0) != null) {
                productOrderId = cor.getOutput().getResponseProductOfferDetail().get(0).getProductOrderId();
                productOrderActionIdx9 = cor.getOutput().getResponseProductOfferDetail().get(0).getProductOrderActionIdx9();
            }

            entity = orderRequestService.persistNewOrderIfAbsent(customerId, productOfferId, payload, plans, StringUtils.isNotBlank(productOrderId) ? productOrderId : null, StringUtils.isNotBlank(productOrderActionIdx9) ? productOrderActionIdx9 : null);

            if (cor == null || cor.getOutput() == null || cor.getOutput().getResponseProductOfferDetail() == null || cor.getOutput().getResponseProductOfferDetail().isEmpty()) {
                if (null != entity) orderRequestService.markRejected(entity, "Empty response from ");
                return CeaseLineResponse.builder().httpCode(500).message("FAILED").orderNo(null).orderActionId(null).build();
            } else
                log.info("Obtained ceaseOrderResponse response from  for productOfferId: {}, trackingId: {},sc:{}", productOfferId, trackingId, sc);

            final List<CeaseOrderResponse.ResponseProductOfferDetail> productOfferDetail = cor.getOutput().getResponseProductOfferDetail();
            final CeaseOrderResponse.ResponseProductOfferDetail responseProductOfferDetail = productOfferDetail.get(0);

            final String errorCode = responseProductOfferDetail.getErrorCode();
            final String reason = responseProductOfferDetail.getReason();

            if (StringUtils.isNotBlank(errorCode) && StringUtils.isNotBlank(reason)) {
                // UC - 4
                if ("CEASED_PRODUCT".equalsIgnoreCase(errorCode) && reason.contains("Cease cannot be initiated on Ceased product")) {
                    log.info("Cease on Cease from DB. orderId={}, actionId={} trackingID={}", entity.getBillerOrderNo(), entity.getBillerOrderActionNo(), trackingId);
                    return CeaseLineResponse.builder().httpCode(200).message("SUCCESS").orderNo(entity.getBillerOrderNo()).orderActionId(entity.getBillerOrderActionNo()).messageDescription(config.getString("cease.on.cease.message.description")).build();
                }

                // UC - 2
                if ("3009".equalsIgnoreCase(errorCode) && reason.contains("Existing Pending Orders exist on the subscriber")) {
                    Matcher matcher = Pattern.compile("(?:Suspend Order Id|Order Id)\\s*:\\s*(\\d+)").matcher(reason);
                    String orderId = null;
                    if (matcher.find()) orderId = matcher.group(1);
                    log.info("OrderId obtained from existing pending order from  response. orderId={}", orderId);
                    // call AMA to create case
                    log.info("Creating case in AMA for stuck cease order. Order ID: {}, Action ID: {}, trackingId{}", entity.getBillerOrderNo(),
                        entity.getBillerOrderActionNo(), trackingId);
                    String bpmOrderId = req.getPayload().getBpmOrderInputX9() != null
                        ? req.getPayload().getBpmOrderInputX9().getBpmOrderIdx9()
                        : null;
                    String caseNumber = maybeCreateAmaCase(customerId, entity, productOfferId, bpmOrderId, httpHeaders);
                    return buildPendingOrderResponse400(orderId, reason, caseNumber);
                }
                //new uc
                if (("92000167".equalsIgnoreCase(errorCode) && reason != null && reason.contains("Pending order"))) {
                    Matcher matcher = Pattern.compile("Pending order (\\d+) found with reason (\\w+) that is not allowed to be cancelled").matcher(reason);
                    String orderId = null;
                    if (matcher.find()) orderId = matcher.group(1);
                    log.info("OrderId obtained from existing pending order from  response. orderId={}", orderId);
                    // call AMA to create case
                    log.info("Creating case in AMA for stuck cease order. Order ID: {}, Action ID: {}, trackingId{}", entity.getBillerOrderNo(),
                        entity.getBillerOrderActionNo(), trackingId);
                    String bpmOrderId = req.getPayload().getBpmOrderInputX9() != null
                        ? req.getPayload().getBpmOrderInputX9().getBpmOrderIdx9()
                        : null;
                    String caseNumber = maybeCreateAmaCase(customerId, entity, productOfferId, bpmOrderId, httpHeaders);
                    return buildPendingOrderResponse400(orderId, reason, caseNumber);
                }
                // UC - 3
                if ("CEASED_PRODUCT".equalsIgnoreCase(errorCode) && reason.contains("Pending OA ID")) {
                    Matcher matcher = Pattern.compile("Pending OA ID\\s*:\\s*(\\d+)").matcher(reason);
                    String oaId = null;
                    if (matcher.find()) oaId = matcher.group(1);
                    log.info("Pending OA ID from  response: {}", oaId);
                    log.info("Cease on Pending OA from DB. Order ID: {}, Action ID: {}, trackingId: {}", entity.getBillerOrderNo(), entity.getBillerOrderActionNo(), trackingId);
                    log.info("OrderActionId match check - : {}, DB: {}", oaId, entity.getBillerOrderActionNo());
                    return CeaseLineResponse.builder().httpCode(200).message("SUCCESS").orderNo(entity.getBillerOrderNo()).orderActionId(oaId).messageDescription(reason).build();
                }

                // US - 6, other failures
                if (("3009".equalsIgnoreCase(errorCode) && reason.contains("Invalid reason code")) || (("2000103".equalsIgnoreCase(errorCode) && reason.contains("Product with provided identifier could not be found")))) {
                    log.info("Bad request from . errorCode={}, reason={}, trackingId={}", errorCode, reason, trackingId);
                    return CeaseLineResponse.builder().httpCode(400).message("BAD-REQUEST").orderNo(null).orderActionId(null).messageDescription(reason).build();
                }

                // US - 6, other failures - with no reason code check
                if (StringUtils.isNotBlank(errorCode)) {
                    log.info("Biller error from . errorCode={}, reason={}, trackingId={}", errorCode, reason, trackingId);
                    return CeaseLineResponse.builder().httpCode(500).message("BILLER-ERROR").orderNo(null).orderActionId(null).messageDescription(reason).build();
                }
            }

            // UC - 1 & #5
            final String productOrderReferenceNumber = responseProductOfferDetail.getProductOrderReferenceNumber();
            if (StringUtils.isNotBlank(productOrderReferenceNumber) && entity != null) {
                entity.setStatus(OrderStatus.SUBMITTED);
                entity.setBillerOrderNo(responseProductOfferDetail.getProductOrderId());
                entity.setBillerOrderActionNo(responseProductOfferDetail.getProductOrderActionIdx9());
                entity.setBillerOrderCreationTime(Instant.now());
                entity.setRequestResponseTime(Instant.now());
                entity.setAttemptCount(entity.getAttemptCount());
                final OrderRequest save = ceaseOrderRepo.save(entity);
                log.info("Cease acknowledged by , productOrderReferenceNumber={}. Updated row: {}, trackingId: {}", productOrderReferenceNumber, save, trackingId);
                final OrderStatusAudit audit = auditService.audit(entity.getSeqNo(), "API_POST", " createCeaseOrder OK: order=" + responseProductOfferDetail.getProductOrderId());
                if (audit != null)
                    log.info("Cease acknowledged by , productOrderReferenceNumber={}. Audit: {}. TrackingID: {}", productOrderReferenceNumber, audit, trackingId);
            }

            if (entity != null && entity.getSeqNo() != null) {
                try {
                    log.info("Waiting for polling result for seqNo={} trackingId={}", entity.getSeqNo(), trackingId);
                    CompletableFuture<CeaseLineResponse> future = pollNotifier.register(entity.getSeqNo());
                    return future.get(futureTimeoutSeconds, TimeUnit.SECONDS);
                } catch (TimeoutException te) {
                    log.warn("Polling did not complete in time for seqNo={} trackingId{}", entity.getSeqNo(), trackingId);
                    CeaseLineResponse response = buildSuccessResponseIfOrderCompleted(customerId, productOfferId, entity.getSeqNo());
                    if (response != null) return response;

                    // call AMA to create case
                    log.info("Creating case in AMA for stuck cease order. Order ID: {}, Action ID: {}, trackingId: {}", entity.getBillerOrderNo(), entity.getBillerOrderActionNo(),
                        trackingId);
                    String bpmOrderId = req.getPayload().getBpmOrderInputX9() != null
                        ? req.getPayload().getBpmOrderInputX9().getBpmOrderIdx9()
                        : null;
                    String caseNumber = maybeCreateAmaCase(customerId, entity, productOfferId, bpmOrderId, httpHeaders);
                    return buildAcceptedOrderResponse202(entity.getBillerOrderNo(), entity.getBillerOrderActionNo(), "Order created but still processing", caseNumber);
                } catch (Exception e) {
                    log.error("Error while waiting for poll result", e);
                    CeaseLineResponse response = buildSuccessResponseIfOrderCompleted(customerId, productOfferId, entity.getSeqNo());
                    if (response != null) return response;

                    return CeaseLineResponse.builder()
                        .httpCode(500)
                        .message("FAILED")
                        .orderNo(entity.getBillerOrderNo())
                        .orderActionId(entity.getBillerOrderActionNo())
                        .build();
                }
            }
        } catch (Exception ex) {
            String errorDetailMessage = null;
            if (ex instanceof ErrorResponseFromExternalException) {
                Throwable causedBy = ((ErrorResponseFromExternalException) ex).getCausedBy();
                if (causedBy instanceof HttpServerErrorException.InternalServerError) {
                    errorDetailMessage = ((HttpServerErrorException.InternalServerError) causedBy).getResponseBodyAsString();
                }
            }
            String errorMessage = null;
            if (null != errorDetailMessage) {
                try {
                    errorMessage = extractImplErrorInfo(errorDetailMessage);
                    log.error("Failed to create cease order for productOfferId: {}, message={}, trackingId{}", productOfferId, errorMessage,
                        trackingId);
                    if (null != entity) {
                        String errorMsg = " create error: " + (errorMessage != null ? errorMessage : ex.getMessage());
                        orderRequestService.markRejected(entity, errorMsg);
                    }
                } catch (Exception e) {
                    log.error("Failed to extractImplErrorInfo, message={}, exception={} trackingId{}", e.getMessage(), e, trackingId);
                }
            }
            return CeaseLineResponse.builder().httpCode(500).message("FAILED").orderNo(null).orderActionId(null).messageDescription(errorMessage).build();
        }
        return CeaseLineResponse.builder().httpCode(500).message("FAILED").build();
    }

    public OrderStatusView getStatus(String customerId, String productOfferId) {
        return ceaseOrderRepo.findByCustomerIdAndProductOfferId(customerId, productOfferId)
            .map(orderRequest -> OrderStatusView.builder()
                .seqNo(orderRequest.getSeqNo())
                .bpmOrderId(orderRequest.getBpmOrderId())
                .lineId(orderRequest.getLineId())
                .customerId(orderRequest.getCustomerId())
                .productOfferId(orderRequest.getProductOfferId())
                .reqCreateTime(orderRequest.getReqCreateTime())
                .requestResponseTime(orderRequest.getRequestResponseTime())
                .status(orderRequest.getStatus().name())
                .billerOrderCreationTime(orderRequest.getBillerOrderCreationTime())
                .billerOrderNo(orderRequest.getBillerOrderNo())
                .billerOrderActionNo(orderRequest.getBillerOrderActionNo())
                .statusUpdateDate(orderRequest.getStatusUpdateDate())
                .statusUpdateSource(orderRequest.getStatusUpdateSource())
                .rejectReason(orderRequest.getRejectReason())
                .attemptCount(orderRequest.getAttemptCount())
                .submittedAttemptsRemaining(orderRequest.getSubmittedAttemptsRemaining())
                .completedAttemptsRemaining(orderRequest.getCompletedAttemptsRemaining())
                .nextPollAt(orderRequest.getNextPollAt())
                .lastPolledAt(orderRequest.getLastPolledAt())
                .lockUntil(orderRequest.getLockUntil())
                .lockedBy(orderRequest.getLockedBy())
                .updateContext(orderRequest.getUpdateContext())
                .trackingId(orderRequest.getTrackingId())
                .routeEnv(orderRequest.getRouteEnv())
                .build())
            .orElse(null);
    }

    private String extractImplErrorInfo(final String messageDescription) throws Exception {
        JsonNode root = mapper.readTree(messageDescription);
        JsonNode implErrorInfo = root.path("errors").get(0)
            .path("nestedError").path("ImplErrorInfo");
        return mapper.writeValueAsString(implErrorInfo);
    }

    private CeaseLineResponse buildSuccessResponseIfOrderCompleted(String customerId, String productOfferId, Long seqNo) {
        String trackingId = ServiceContext.getString(CEASE_LINE_TRACKING_ID);
        Optional<OrderRequest> orderRequest = ceaseOrderRepo.findByCustomerIdAndProductOfferId(customerId, productOfferId);
        if (orderRequest.isPresent() && orderRequest.get().getStatus().equals(OrderStatus.SUCCESS)) {
            log.warn("Returning background process status: success for seqNo={} trackingId{}", seqNo, trackingId);
            return CeaseLineResponse.builder()
                .httpCode(200)
                .message("SUCCESS")
                .orderNo(orderRequest.get().getBillerOrderNo())
                .orderActionId(orderRequest.get().getBillerOrderActionNo())
                .build();
        }
        return null;
    }

    private IopCaseRequest buildIopCaseRequest(String customerId, String billerOrderNo, String billerOrderActionNo, String productOfferId, String bpmOrderId) {
        String trackingId = ServiceContext.getString(CEASE_LINE_TRACKING_ID);
        String uDescription =String.format(
            "Voluntary Cease order has been stuck..\n\nCustomerID:%s\nProductOfferID:%s\nBPMOrderID:%s\nOrderID:%s\nOrderActionID:%s",
            customerId, productOfferId, bpmOrderId, billerOrderNo, billerOrderActionNo
        );
        log.info("AMA case description: {} trackingID{}", uDescription, trackingId);
        String correlation = String.format("%s:%s:%s", trackingId, billerOrderNo, billerOrderActionNo);

        return IopCaseRequest.builder()
            .uOpenedFor("")
            .uAssignmentGroup("")
            .uSymptoms("-")
            .uCategory("XM - Order Management")
            .uSubcategory("Stuck Order")
            .uChannel("API")
            .uShortDescription(" Voluntary Cease - Stuck Order")
            .uDescription(uDescription)
            .uPriority("High")
            .uCorrelationId(correlation)
            .uAssignedTo(ServiceConfig.getCeaseLineAssignToUser())
            .build();
    }

    private IopCaseRequest buildIopCaseRequestWithState(
        String uNumber,
        String uAssignedTo,
        String uState
    ) {
        return IopCaseRequest.builder()
            .uNumber(uNumber)
            .uAssignedTo(uAssignedTo)
            .uState(uState)
            .build();
    }

    private CaseNumberAndState createCaseAndGetCaseNumber(IopCaseRequest request1, HttpHeaders httpHeaders) {
        String trackingId = ServiceContext.getString(CEASE_LINE_TRACKING_ID);
        String caseResponse = amaServiceConnector.callAmaService(
            HttpMethod.POST,
            ServiceConfig.getAmaApiCaseUrl(),
            request1,
            httpHeaders,
            String.class
        );

        List<Map<String, Object>> responseList;
        try {
            responseList = mapper.readValue(caseResponse, new TypeReference<List<Map<String, Object>>>() {});
        } catch (JsonProcessingException e) {
            throw new RuntimeException("Failed to parse case response", e);
        }

        if (!responseList.isEmpty()) {
            Map<String, Object> firstItem = responseList.get(0);
            if (firstItem.containsKey("result")) {
                List<ResultItem> resultItems = mapper.convertValue(firstItem.get("result"), new TypeReference<List<ResultItem>>() {});
                ResultItem firstResult = resultItems.get(0);
                log.error("Result status: {}, error: {}", firstResult.getStatus(), firstResult.getError_message());
                throw new CaseCreationException(firstResult.getError_message());
            } else {
                CaseResponse caseResponseHappy = mapper.convertValue(firstItem, CaseResponse.class);
                log.info("Case number: {}, trackingId: {}", caseResponseHappy.getCase_number(),
                    trackingId);
                return new CaseNumberAndState(caseResponseHappy.getCase_number(), caseResponseHappy.getState());
            }
        }
        throw new CaseCreationException("Empty response from AMA service");
    }

    private void fetchCaseStateAsync(IopCaseRequest requestWithState, String caseNumber, HttpHeaders httpHeaders) {
        CompletableFuture.runAsync(() -> {
            try {

                CaseNumberAndState caseNumberAndState = createCaseAndGetCaseNumber(requestWithState, httpHeaders);
                if (caseNumberAndState != null) {
                    log.info("Fetched case state for fire and forget AMA in progress {}: {}. Built request with state: {}",
                        caseNumberAndState.getCaseNumber(), caseNumberAndState.getState(), requestWithState);
                }
            } catch (Exception e) {
                log.warn("Failed to fetch case state for fire and forget for in progress scenario  {}: {}", caseNumber, e.getMessage());
            }
        });
    }

    private CeaseLineResponse buildPendingOrderResponse400(String orderId, String reason, String caseNumber) {
        String trackingId = ServiceContext.getString(CEASE_LINE_TRACKING_ID);
        String messageDesc = amaCaseEnabled && caseNumber != null
            ? reason + " Case Number: " + caseNumber
            : reason;
        log.info("Returning PENDING-ORDER response 400 for orderId: {}, messageDesc: {}, trackingId{}", orderId, messageDesc,
            trackingId);
        return CeaseLineResponse.builder()
            .httpCode(400)
            .message("PENDING-ORDER")
            .orderNo(orderId)
            .orderActionId(null)
            .messageDescription(messageDesc)
            .build();
    }

    private CeaseLineResponse buildAcceptedOrderResponse202(String orderId, final String orderActionId, String reason, String caseNumber) {
        String trackingId = ServiceContext.getString(CEASE_LINE_TRACKING_ID);
        String messageDesc = amaCaseEnabled && caseNumber != null
            ? reason + " Case Number: " + caseNumber
            : reason;
        log.info("Returning ACCEPTED response 202 for orderId: {}, messageDesc: {}, trackingId{}", orderId, messageDesc,
            trackingId);
        return CeaseLineResponse.builder()
            .httpCode(202)
            .message("ACCEPTED")
            .orderNo(orderId)
            .orderActionId(orderActionId)
            .messageDescription(messageDesc)
            .build();
    }

    private String maybeCreateAmaCase(String customerId, OrderRequest entity, String productOfferId, String bpmOrderId, HttpHeaders httpHeaders) {
        String trackingId = ServiceContext.getString(CEASE_LINE_TRACKING_ID);
        if (!amaCaseEnabled) return null;
        IopCaseRequest request = buildIopCaseRequest(customerId, entity.getBillerOrderNo(), entity.getBillerOrderActionNo(), productOfferId, bpmOrderId);
        String caseNumber = createCaseAndGetCaseNumber(request, httpHeaders).getCaseNumber();
        IopCaseRequest requestWithState = buildIopCaseRequestWithState(caseNumber, ServiceConfig.getCeaseLineAssignToUser(), "15");
        log.info("Created case in AMA for stuck cease order. Order ID: {}, Action ID: {}, Case Number: {}, trackingId{}",
            entity.getBillerOrderNo(), entity.getBillerOrderActionNo(), caseNumber, trackingId);
        // Clone headers and set tracking ID explicitly
        HttpHeaders asyncHeaders = new HttpHeaders();
        asyncHeaders.putAll(httpHeaders);
        asyncHeaders.set(CEASE_LINE_TRACKING_ID, trackingId);

        // Guard the async call
        if (fetchStateAsyncEnabled) {
            fetchCaseStateAsync(requestWithState, caseNumber, asyncHeaders);
        } else {
            log.info("Async case state fetching for in progress state is disabled for case: {}", caseNumber);
        }
        return caseNumber;
    }```
@Slf4j
@RefreshScope
public class PollWorker {

enter code here
    private final PollingDao pollingDao;
    private final OrderRequestRepository ceaseOrderRepo;
    private final PollNotifier pollNotifier;
    private final PollService pollService;
    @Autowired
    private ServiceConfiguration config;

    @Value("${.worker.batch-size}")
    private int batch;

    @Value("${.worker.lock-ttl-seconds}")
    private int lockTtl;

    @Value("${.worker.instance-id}")
    private String instanceId;

    @Autowired
    @Qualifier("db.polling.executor")
    private ScheduledExecutorService executor;

    @Scheduled(fixedDelayString = "${.worker.fixed-delay-ms}")
    public void pollPendingRows() {
        if (config.getBoolean(".worker.polling.enabled", false)) {
            String now = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"));
            log.info("Polling at {} for pending rows with batch size {} and lock TTL {} seconds", now, batch, lockTtl);
            List<Long> ids = pollingDao.claimPendingRows(batch, instanceId, lockTtl);
            for (Long id : ids) {
                startPoll(id);
            }
        }
    }

    public void startPoll(Long seqNo) {
        executor.submit(() -> doPoll(seqNo, 0));
    }

    private void doPoll(Long seqNo, int planIndex) {
        try {
            log.info("Polling seqNo={} at planIndex={}", seqNo, planIndex);
            pollService.handlePoll(seqNo);

            int nextDelay = pollService.getNextDelay(seqNo, planIndex);
            if (nextDelay > 0) {
                executor.schedule(() -> doPoll(seqNo, planIndex + 1), nextDelay, TimeUnit.SECONDS);
            } else {
                OrderRequest entity = ceaseOrderRepo.findBySeqNo(seqNo).orElse(null);
                CeaseLineResponse result = CeaseLineResponse.builder()
                    .httpCode(200)
                    .message("SUCCESS")
                    .orderNo(entity != null ? entity.getBillerOrderNo() : null)
                    .orderActionId(entity != null ? entity.getBillerOrderActionNo() : null)
                    .messageDescription("Order processed successfully")
                    .build();

                pollNotifier.complete(seqNo, result);

            }
        } catch (Exception ex) {
            log.error("Polling error on seqNo {}: {}", seqNo, ex.toString());
            pollNotifier.completeExceptionally(seqNo, ex);
        }
    }
}

PollNotifier

@Component
@Slf4j
public class PollNotifier {

    private final Map<Long, CompletableFuture<CeaseLineResponse>> pending = new ConcurrentHashMap<>();

    public CompletableFuture<CeaseLineResponse> register(Long seqNo) {
        CompletableFuture<CeaseLineResponse> future = new CompletableFuture<>();
        pending.put(seqNo, future);
        return future;
    }

    public void complete(Long seqNo, CeaseLineResponse response) {
        CompletableFuture<CeaseLineResponse> future = pending.remove(seqNo);
        if (future != null) {
            future.complete(response);
        } else {
            log.warn("No future found for seqNo {}", seqNo);
        }
    }

    public void completeExceptionally(Long seqNo, Throwable ex) {
        CompletableFuture<CeaseLineResponse> future = pending.remove(seqNo);
        if (future != null) {
            future.completeExceptionally(ex);
        }
    }

}

PollPlans

@RefreshScope
@Getter
public class PollPlans {
    private final int[] completedPlan;
    private final int[] submittedPlan;
    private final int initialDelaySeconds;

    public PollPlans(
        @Value("${.poll.plan.completed}") String completed,
        @Value("${.poll.plan.submitted}") String submitted,
        @Value("${.queue.initial-delay-seconds}") int initialDelaySeconds) {
        this.completedPlan = Arrays.stream(completed.split(",")).map(String::trim).mapToInt(Integer::parseInt).toArray();
        this.submittedPlan = Arrays.stream(submitted.split(",")).map(String::trim).mapToInt(Integer::parseInt).toArray();
        this.initialDelaySeconds = initialDelaySeconds;
    }
}

PollService

@RequiredArgsConstructor
@Slf4j
public class PollService {

    private final OrderRequestRepository ceaseOrderRepo;
    private final AuditService auditService;
    private final OrderProcessor ;
    private final PollPlans plans;
    private final PollNotifier pollNotifier;
    private final PollingDao pollingDao;

    @Value("${.poll.plan.completed}") String completedPlan;
    @Value("${.poll.plan.submitted}") String submittedPlan;

    @Transactional
    public void handlePoll(Long seqNo) {
        final OrderRequest orderRequest = ceaseOrderRepo.findById(seqNo).orElseThrow();

        if (orderRequest.getStatus() == OrderStatus.SUBMITTED) {
            log.info("Inside handlePoll for seqNo={} trackingId{}", seqNo, orderRequest.getTrackingId());
            if (orderRequest.getBillerOrderNo() == null) {
                final OrderStatusAudit audit = auditService.audit(seqNo, "POLL_SKIP", "No billerOrderNo");
                if (audit != null) log.info("BillerOrderNo null from DB. Audit: {} trackingId={}", audit, orderRequest.getTrackingId());
                return;
            }
            try {
                OrderStatusResponse orderStatus = .getCeaseOrderStatus(orderRequest.getCustomerId(), orderRequest.getBillerOrderNo()
                    , orderRequest.getRouteEnv(), orderRequest.getSc(), orderRequest.getTrackingId());

                String status = orderStatus != null && orderStatus.getMessage() != null ? orderStatus.getMessage().getStatus() : null;
                String id = orderStatus != null && orderStatus.getMessage() != null ? orderStatus.getMessage().getId() : null;

                final OrderStatusAudit audit = auditService.audit(seqNo, "API_GET", " getOrderStatus OK: orderId=" + id + " status=" + status);
                if (audit != null) log.info("After getOrderStatus from . Audit: {} trackingId={}", audit, orderRequest.getTrackingId());

                if (id != null && !id.equals(orderRequest.getBillerOrderNo())) {
                    final OrderStatusAudit pollWarn = auditService.audit(seqNo, "POLL_WARN", "ID mismatch expected=" + orderRequest.getBillerOrderNo() + " got=" + id);
                    if (pollWarn != null) log.info("ID mismatch expected. Audit: {} trackingId={}", pollWarn, orderRequest.getTrackingId());
                }

                if ("CLOSED".equalsIgnoreCase(status)) {
                    orderRequest.setStatus(OrderStatus.SUCCESS);
                    orderRequest.setStatusUpdateDate(Instant.now());
                    orderRequest.setStatusUpdateSource("API");
                    orderRequest.setNextPollAt(null);
                    orderRequest.setLockUntil(null);
                    orderRequest.setLockedBy(null);
                    orderRequest.setSubmittedAttemptsRemaining(orderRequest.getSubmittedAttemptsRemaining() - 1);
                    orderRequest.setAttemptCount(orderRequest.getAttemptCount() + 1);
                    final OrderRequest save = ceaseOrderRepo.save(orderRequest);
                    log.info("Obtained CLOSED status. Updated row: {} trackingId={}", save, orderRequest.getTrackingId());
                    final OrderStatusAudit audit1 = auditService.audit(seqNo, "UPDATE", "COMPLETED via API");
                    if (audit1 != null) log.info("Obtained CLOSED status. Audit: {} trackingId={}", audit1, orderRequest.getTrackingId());

                    pollNotifier.complete(seqNo, CeaseLineResponse.builder()
                        .httpCode(200)
                        .message("SUCCESS")
                        .orderNo(orderRequest.getBillerOrderNo())
                        .orderActionId(orderRequest.getBillerOrderActionNo())
                        .build());
                } else {
                    int idx = plans.getSubmittedPlan().length - orderRequest.getSubmittedAttemptsRemaining() + 1;
                    int nextSec = (idx < plans.getSubmittedPlan().length) ? plans.getSubmittedPlan()[idx] : 0;

                    if (orderRequest.getSubmittedAttemptsRemaining() > 1) {
                        int rows = pollingDao.decrementSubmittedRemaining(seqNo, nextSec);
                        final OrderStatusAudit update = auditService.audit(seqNo, "UPDATE", "SUBMITTED still pending; rows=" + rows + "; next in " + nextSec + "s");
                        if (update != null) log.info("SUBMITTED status. Audit: {} trackingId={}", update, orderRequest.getTrackingId());
                    } else {
                        orderRequest.setNextPollAt(null);
                        orderRequest.setLockUntil(null);
                        orderRequest.setLockedBy(null);
                        orderRequest.setSubmittedAttemptsRemaining(orderRequest.getSubmittedAttemptsRemaining() - 1);
                        orderRequest.setAttemptCount(orderRequest.getAttemptCount() + 1);
                        final OrderRequest save = ceaseOrderRepo.save(orderRequest);
                        log.info("SUBMITTED (polling finished). Updated row: {} trackingId={}", save, orderRequest.getTrackingId());
                        final OrderStatusAudit update = auditService.audit(seqNo, "UPDATE", "SUBMITTED (polling finished)");
                        if (update != null) log.info("SUBMITTED (polling finished). Audit: {} trackingId={}", update, orderRequest.getTrackingId());

                        pollNotifier.complete(seqNo, CeaseLineResponse.builder()
                            .httpCode(202)
                            .message("ACCEPTED")
                            .orderNo(orderRequest.getBillerOrderNo())
                            .orderActionId(orderRequest.getBillerOrderActionNo())
                            .messageDescription("Order created but still processing")
                            .build());
                    }
                }
            } catch (Exception ex) {
                orderRequest.setStatus(OrderStatus.FAILED);
                orderRequest.setRejectReason("Poll error: " + ex.getMessage());
                orderRequest.setNextPollAt(null);
                orderRequest.setLockUntil(null);
                orderRequest.setLockedBy(null);
                final OrderRequest save = ceaseOrderRepo.save(orderRequest);
                log.info("Obtained FAILED status. Updated row: {} trackingId{}", save, orderRequest.getTrackingId());
                final OrderStatusAudit update = auditService.audit(seqNo, "UPDATE", "FAILED on poll: " + ex.getMessage());
                if (update != null) log.info("Obtained FAILED status. Audit: {} trackingId={}", update, orderRequest.getTrackingId());

                pollNotifier.complete(seqNo, CeaseLineResponse.builder()
                    .httpCode(500)
                    .message("FAILED")
                    .orderNo(orderRequest.getBillerOrderNo())
                    .orderActionId(orderRequest.getBillerOrderActionNo())
                    .messageDescription(orderRequest.getRejectReason())
                    .build());
            }
        }
    }

    public int getNextDelay(Long seqNo, int planIndex) {
        List<Integer> plan = getPlanForSeqNo(seqNo);
        if (planIndex < plan.size() - 1) {
            return plan.get(planIndex + 1);
        }
        return 0;
    }

    private List<Integer> getPlanForSeqNo(Long seqNo) {
        Optional<OrderRequest> optEntity = ceaseOrderRepo.findById(seqNo);
        if (optEntity.isEmpty()) {
            log.warn("No OrderRequest found for seqNo={}", seqNo);
            return Collections.emptyList();
        }

        OrderRequest entity = optEntity.get();

        String planStr;
        if (entity.getStatus() == OrderStatus.SUBMITTED) planStr = submittedPlan;
        else if (entity.getStatus() == OrderStatus.COMPLETED) planStr = completedPlan;
        else planStr = submittedPlan;

        return Arrays.stream(planStr.split(","))
            .map(String::trim)
            .map(Integer::parseInt)
            .collect(Collectors.toList());
    }


}

PollingDao

@RequiredArgsConstructor
@Slf4j
public class PollingDao {

    private final JdbcTemplate jdbc;

    @Transactional
    public List<Long> claimPendingRows(int batch, String instanceId, int lockTtlSeconds) {
        String sel = ""
            + "SELECT seq_no FROM order_requests WHERE status in ('SUBMITTED') AND next_poll_at <= CURRENT_TIMESTAMP LIMIT ? FOR UPDATE SKIP LOCKED";

        List<Long> ids = jdbc.query(sel, ps -> ps.setInt(1, batch),
            (ResultSet rs, int rowNum) -> rs.getLong(1));

        if (ids.isEmpty()) return ids;

        StringBuilder placeholders = new StringBuilder();
        for (int i=0;i<ids.size();i++){ if(i>0) placeholders.append(","); placeholders.append("?"); }

        String lockUpdate = ""
            + "UPDATE order_requests "
            + "SET lock_until = DATE_ADD(UTC_TIMESTAMP(), INTERVAL ? SECOND), locked_by = ?, last_polled_at = UTC_TIMESTAMP() "
            + "WHERE seq_no IN (" + placeholders + ")";

        jdbc.update(con -> {
            final PreparedStatement ps = con.prepareStatement(lockUpdate);
            int idx = 1;
            ps.setInt(idx++, lockTtlSeconds);
            ps.setString(idx++, instanceId);
            for (Long id : ids) ps.setLong(idx++, id);
            return ps;
        });

        log.info("Claimed rows {} by {}", ids, instanceId);
        return ids;
    }

    public int decrementSubmittedRemaining(Long seqNo, int secondsToNext) {
        String sql = "UPDATE order_requests SET submitted_attempts_remaining=submitted_attempts_remaining-1, "
            + " next_poll_at = DATE_ADD(UTC_TIMESTAMP(), INTERVAL ? SECOND), "
            + "attempt_count = attempt_count + 1 "
            + "WHERE seq_no=? AND submitted_attempts_remaining>0";
        return jdbc.update(sql, secondsToNext, seqNo);
    }
}

Config

@Bean(name = "db.polling.executor")
public ScheduledExecutorService executor() {
    ThreadFactory threadFactory = new ThreadFactory() {
        final AtomicLong count = new AtomicLong(1);

        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            thread.setName("DB-polling-Thread-" + count.getAndIncrement());
            thread.setPriority(Thread.MAX_PRIORITY);
            return thread;
        }
    };
    RejectedExecutionHandler rejectionHandler = (runnable, executor) -> LOGGER.info(
        runnable.toString() + " Subscription queue is full. Active Threads are: {} and work queue length is {} ",
        executor.getActiveCount(),
        executor.getQueue().size()
    );

    ScheduledThreadPoolExecutor scheduledExecutor = new ScheduledThreadPoolExecutor(
        Integer.parseInt(config.getString(DB_POLLING_THREAD_COREPOOL_SIZE, "20")),
        threadFactory,
        rejectionHandler
    );
    scheduledExecutor.setMaximumPoolSize(Integer.parseInt(config.getString(DB_POLLING_THREAD_MAXPOOL_SIZE, "50")));
    scheduledExecutor.setKeepAliveTime(Long.parseLong(config.getString(DB_POLLING_THREAD_TIMETOLIVE, "300000")), TimeUnit.MILLISECONDS);
    scheduledExecutor.setRemoveOnCancelPolicy(true);
    return scheduledExecutor;
}

OrderRequestRepository

@Repository
public interface OrderRequestRepository extends JpaRepository<OrderRequest, Long> {

    Optional<OrderRequest> findBySeqNo(Long seqNo);

    Optional<OrderRequest> findByCustomerIdAndProductOfferId(String customerId, String productOfferId);

    @Modifying
    @Query("update OrderRequest o set o.lockUntil = :lockUntil, o.lockedBy = :lockedBy where o.seqNo = :seqNo")
    int lockRow(Long seqNo, Instant lockUntil, String lockedBy);

    @Modifying
    @Query("update OrderRequest o set o.status = :status, o.statusUpdateDate = :updateDate, o.statusUpdateSource=:source where o.seqNo = :seqNo")
    int updateStatus(Long seqNo, OrderStatus status, Instant updateDate, String source);

    Optional<OrderRequest> findByBillerOrderNoAndBillerOrderActionNo(String billerOrderNo, String billerOrderActionNo);
} 

Controller

@PreAuthorize("#oauth2.hasScope('::ceaseorder')")
@PostMapping("/cease/ceaseline")
public ResponseEntity<CeaseLineResponse> createCeaseOrder(
   
    @RequestHeader("customerId") String customerId,
    @RequestParam(value = Constants.SALES_CHANNEL, required = false, defaultValue = Constants.SALES_CHANNEL_DEFAULT) String sc,
    @RequestParam(value = Constants.QUERY_PARAM_LOCALE, required = false,defaultValue = Constants.LOCALE_DEFAULT) String lo,
    @Valid @RequestBody CreateAndSubmitSuspendResumeCeaseOrderRequest request) {
    if ((routeEnv == null || routeEnv.isEmpty())) {
        LOGGER.info("routeEnv is : {}",routeEnv);
        ServiceContext.put(CEASE_LINE_ROUTE_ENV, DefaultEnv);
    }else{
        LOGGER.info("routeEnv is : {}",routeEnv);
        ServiceContext.put(CEASE_LINE_ROUTE_ENV, routeEnv);
    }
    ServiceContext.put(CEASE_LINE_TRACKING_ID, trackingId);
    ServiceContext.put(KEY_API_NAME, "ceaseline");
    ServiceContext.put(SALES_CHANNEL, sc);
    CeaseLineResponse resp = service.createCeaseOrder(customerId, request, sc, lo);
    return ResponseEntity.status(resp.getHttpCode()).body(resp);
}

OrderService

```public class OrderService {

    private final OrderRequestRepository ceaseOrderRepo;
    private final AuditService auditService;
    private final OrderProcessor ;
    private final PollPlans plans;
    private final PollNotifier pollNotifier;

    @Value("${app.case.fetch-state-async.enabled:false}")
    private boolean fetchStateAsyncEnabled;

    @Value("${cease.future.timeout-seconds:10}")
    private int futureTimeoutSeconds;
    private final OrderRequestService orderRequestService;
    @Autowired
    private ServiceConfiguration config;
    @Autowired
    private ObjectMapper mapper;

    @Autowired
    private AmaServiceConnector amaServiceConnector;

    @Autowired
    private ServiceConfig ServiceConfig;

    @Value("${ama.case.enabled:false}")
    private boolean amaCaseEnabled;

    public CeaseLineResponse createCeaseOrder(String customerId, CreateAndSubmitSuspendResumeCeaseOrderRequest req,String sc,String lo) {
        String trackingId = ServiceContext.getString(CEASE_LINE_TRACKING_ID);
        HttpHeaders httpHeaders = new HttpHeaders();
        httpHeaders.add(CEASE_LINE_TRACKING_ID,trackingId);
        httpHeaders.add(CEASE_LINE_ROUTE_ENV, ServiceContext.getString(CEASE_LINE_ROUTE_ENV));

        final CreateAndSubmitSuspendResumeCeaseOrderRequest.Payload payload = req.getPayload();
        final CreateAndSubmitSuspendResumeCeaseOrderRequest.RequestProductOfferDetail pod = payload.getRequestProductOfferDetails().get(0);
        String productOfferId = pod.getProductOfferID();
        OrderRequest entity = null;
        try {
            CeaseOrderResponse cor = .createCeaseOrder(customerId, req, sc, lo);

            String productOrderId = null;
            String productOrderActionIdx9 = null;

            if (cor != null
                && cor.getOutput() != null
                && cor.getOutput().getResponseProductOfferDetail() != null
                && !cor.getOutput().getResponseProductOfferDetail().isEmpty()
                && cor.getOutput().getResponseProductOfferDetail().get(0) != null) {
                productOrderId = cor.getOutput().getResponseProductOfferDetail().get(0).getProductOrderId();
                productOrderActionIdx9 = cor.getOutput().getResponseProductOfferDetail().get(0).getProductOrderActionIdx9();
            }

            entity = orderRequestService.persistNewOrderIfAbsent(customerId, productOfferId, payload, plans, StringUtils.isNotBlank(productOrderId) ? productOrderId : null, StringUtils.isNotBlank(productOrderActionIdx9) ? productOrderActionIdx9 : null);

            if (cor == null || cor.getOutput() == null || cor.getOutput().getResponseProductOfferDetail() == null || cor.getOutput().getResponseProductOfferDetail().isEmpty()) {
                if (null != entity) orderRequestService.markRejected(entity, "Empty response from ");
                return CeaseLineResponse.builder().httpCode(500).message("FAILED").orderNo(null).orderActionId(null).build();
            } else
                log.info("Obtained ceaseOrderResponse response from  for productOfferId: {}, trackingId: {},sc:{}", productOfferId, trackingId, sc);

            final List<CeaseOrderResponse.ResponseProductOfferDetail> productOfferDetail = cor.getOutput().getResponseProductOfferDetail();
            final CeaseOrderResponse.ResponseProductOfferDetail responseProductOfferDetail = productOfferDetail.get(0);

            final String errorCode = responseProductOfferDetail.getErrorCode();
            final String reason = responseProductOfferDetail.getReason();

            if (StringUtils.isNotBlank(errorCode) && StringUtils.isNotBlank(reason)) {
                // UC - 4
                if ("CEASED_PRODUCT".equalsIgnoreCase(errorCode) && reason.contains("Cease cannot be initiated on Ceased product")) {
                    log.info("Cease on Cease from DB. orderId={}, actionId={} trackingID={}", entity.getBillerOrderNo(), entity.getBillerOrderActionNo(), trackingId);
                    return CeaseLineResponse.builder().httpCode(200).message("SUCCESS").orderNo(entity.getBillerOrderNo()).orderActionId(entity.getBillerOrderActionNo()).messageDescription(config.getString("cease.on.cease.message.description")).build();
                }

                // UC - 2
                if ("3009".equalsIgnoreCase(errorCode) && reason.contains("Existing Pending Orders exist on the subscriber")) {
                    Matcher matcher = Pattern.compile("(?:Suspend Order Id|Order Id)\\s*:\\s*(\\d+)").matcher(reason);
                    String orderId = null;
                    if (matcher.find()) orderId = matcher.group(1);
                    log.info("OrderId obtained from existing pending order from  response. orderId={}", orderId);
                    // call AMA to create case
                    log.info("Creating case in AMA for stuck cease order. Order ID: {}, Action ID: {}, trackingId{}", entity.getBillerOrderNo(),
                        entity.getBillerOrderActionNo(), trackingId);
                    String bpmOrderId = req.getPayload().getBpmOrderInputX9() != null
                        ? req.getPayload().getBpmOrderInputX9().getBpmOrderIdx9()
                        : null;
                    String caseNumber = maybeCreateAmaCase(customerId, entity, productOfferId, bpmOrderId, httpHeaders);
                    return buildPendingOrderResponse400(orderId, reason, caseNumber);
                }
                //new uc
                if (("92000167".equalsIgnoreCase(errorCode) && reason != null && reason.contains("Pending order"))) {
                    Matcher matcher = Pattern.compile("Pending order (\\d+) found with reason (\\w+) that is not allowed to be cancelled").matcher(reason);
                    String orderId = null;
                    if (matcher.find()) orderId = matcher.group(1);
                    log.info("OrderId obtained from existing pending order from  response. orderId={}", orderId);
                    // call AMA to create case
                    log.info("Creating case in AMA for stuck cease order. Order ID: {}, Action ID: {}, trackingId{}", entity.getBillerOrderNo(),
                        entity.getBillerOrderActionNo(), trackingId);
                    String bpmOrderId = req.getPayload().getBpmOrderInputX9() != null
                        ? req.getPayload().getBpmOrderInputX9().getBpmOrderIdx9()
                        : null;
                    String caseNumber = maybeCreateAmaCase(customerId, entity, productOfferId, bpmOrderId, httpHeaders);
                    return buildPendingOrderResponse400(orderId, reason, caseNumber);
                }
                // UC - 3
                if ("CEASED_PRODUCT".equalsIgnoreCase(errorCode) && reason.contains("Pending OA ID")) {
                    Matcher matcher = Pattern.compile("Pending OA ID\\s*:\\s*(\\d+)").matcher(reason);
                    String oaId = null;
                    if (matcher.find()) oaId = matcher.group(1);
                    log.info("Pending OA ID from  response: {}", oaId);
                    log.info("Cease on Pending OA from DB. Order ID: {}, Action ID: {}, trackingId: {}", entity.getBillerOrderNo(), entity.getBillerOrderActionNo(), trackingId);
                    log.info("OrderActionId match check - : {}, DB: {}", oaId, entity.getBillerOrderActionNo());
                    return CeaseLineResponse.builder().httpCode(200).message("SUCCESS").orderNo(entity.getBillerOrderNo()).orderActionId(oaId).messageDescription(reason).build();
                }

                // US - 6, other failures
                if (("3009".equalsIgnoreCase(errorCode) && reason.contains("Invalid reason code")) || (("2000103".equalsIgnoreCase(errorCode) && reason.contains("Product with provided identifier could not be found")))) {
                    log.info("Bad request from . errorCode={}, reason={}, trackingId={}", errorCode, reason, trackingId);
                    return CeaseLineResponse.builder().httpCode(400).message("BAD-REQUEST").orderNo(null).orderActionId(null).messageDescription(reason).build();
                }

                // US - 6, other failures - with no reason code check
                if (StringUtils.isNotBlank(errorCode)) {
                    log.info("Biller error from . errorCode={}, reason={}, trackingId={}", errorCode, reason, trackingId);
                    return CeaseLineResponse.builder().httpCode(500).message("BILLER-ERROR").orderNo(null).orderActionId(null).messageDescription(reason).build();
                }
            }

            // UC - 1 & #5
            final String productOrderReferenceNumber = responseProductOfferDetail.getProductOrderReferenceNumber();
            if (StringUtils.isNotBlank(productOrderReferenceNumber) && entity != null) {
                entity.setStatus(OrderStatus.SUBMITTED);
                entity.setBillerOrderNo(responseProductOfferDetail.getProductOrderId());
                entity.setBillerOrderActionNo(responseProductOfferDetail.getProductOrderActionIdx9());
                entity.setBillerOrderCreationTime(Instant.now());
                entity.setRequestResponseTime(Instant.now());
                entity.setAttemptCount(entity.getAttemptCount());
                final OrderRequest save = ceaseOrderRepo.save(entity);
                log.info("Cease acknowledged by , productOrderReferenceNumber={}. Updated row: {}, trackingId: {}", productOrderReferenceNumber, save, trackingId);
                final OrderStatusAudit audit = auditService.audit(entity.getSeqNo(), "API_POST", " createCeaseOrder OK: order=" + responseProductOfferDetail.getProductOrderId());
                if (audit != null)
                    log.info("Cease acknowledged by , productOrderReferenceNumber={}. Audit: {}. TrackingID: {}", productOrderReferenceNumber, audit, trackingId);
            }

            if (entity != null && entity.getSeqNo() != null) {
                try {
                    log.info("Waiting for polling result for seqNo={} trackingId={}", entity.getSeqNo(), trackingId);
                    CompletableFuture<CeaseLineResponse> future = pollNotifier.register(entity.getSeqNo());
                    return future.get(futureTimeoutSeconds, TimeUnit.SECONDS);
                } catch (TimeoutException te) {
                    log.warn("Polling did not complete in time for seqNo={} trackingId{}", entity.getSeqNo(), trackingId);
                    CeaseLineResponse response = buildSuccessResponseIfOrderCompleted(customerId, productOfferId, entity.getSeqNo());
                    if (response != null) return response;

                    // call AMA to create case
                    log.info("Creating case in AMA for stuck cease order. Order ID: {}, Action ID: {}, trackingId: {}", entity.getBillerOrderNo(), entity.getBillerOrderActionNo(),
                        trackingId);
                    String bpmOrderId = req.getPayload().getBpmOrderInputX9() != null
                        ? req.getPayload().getBpmOrderInputX9().getBpmOrderIdx9()
                        : null;
                    String caseNumber = maybeCreateAmaCase(customerId, entity, productOfferId, bpmOrderId, httpHeaders);
                    return buildAcceptedOrderResponse202(entity.getBillerOrderNo(), entity.getBillerOrderActionNo(), "Order created but still processing", caseNumber);
                } catch (Exception e) {
                    log.error("Error while waiting for poll result", e);
                    CeaseLineResponse response = buildSuccessResponseIfOrderCompleted(customerId, productOfferId, entity.getSeqNo());
                    if (response != null) return response;

                    return CeaseLineResponse.builder()
                        .httpCode(500)
                        .message("FAILED")
                        .orderNo(entity.getBillerOrderNo())
                        .orderActionId(entity.getBillerOrderActionNo())
                        .build();
                }
            }
        } catch (Exception ex) {
            String errorDetailMessage = null;
            if (ex instanceof ErrorResponseFromExternalException) {
                Throwable causedBy = ((ErrorResponseFromExternalException) ex).getCausedBy();
                if (causedBy instanceof HttpServerErrorException.InternalServerError) {
                    errorDetailMessage = ((HttpServerErrorException.InternalServerError) causedBy).getResponseBodyAsString();
                }
            }
            String errorMessage = null;
            if (null != errorDetailMessage) {
                try {
                    errorMessage = extractImplErrorInfo(errorDetailMessage);
                    log.error("Failed to create cease order for productOfferId: {}, message={}, trackingId{}", productOfferId, errorMessage,
                        trackingId);
                    if (null != entity) {
                        String errorMsg = " create error: " + (errorMessage != null ? errorMessage : ex.getMessage());
                        orderRequestService.markRejected(entity, errorMsg);
                    }
                } catch (Exception e) {
                    log.error("Failed to extractImplErrorInfo, message={}, exception={} trackingId{}", e.getMessage(), e, trackingId);
                }
            }
            return CeaseLineResponse.builder().httpCode(500).message("FAILED").orderNo(null).orderActionId(null).messageDescription(errorMessage).build();
        }
        return CeaseLineResponse.builder().httpCode(500).message("FAILED").build();
    }

    public OrderStatusView getStatus(String customerId, String productOfferId) {
        return ceaseOrderRepo.findByCustomerIdAndProductOfferId(customerId, productOfferId)
            .map(orderRequest -> OrderStatusView.builder()
                .seqNo(orderRequest.getSeqNo())
                .bpmOrderId(orderRequest.getBpmOrderId())
                .lineId(orderRequest.getLineId())
                .customerId(orderRequest.getCustomerId())
                .productOfferId(orderRequest.getProductOfferId())
                .reqCreateTime(orderRequest.getReqCreateTime())
                .requestResponseTime(orderRequest.getRequestResponseTime())
                .status(orderRequest.getStatus().name())
                .billerOrderCreationTime(orderRequest.getBillerOrderCreationTime())
                .billerOrderNo(orderRequest.getBillerOrderNo())
                .billerOrderActionNo(orderRequest.getBillerOrderActionNo())
                .statusUpdateDate(orderRequest.getStatusUpdateDate())
                .statusUpdateSource(orderRequest.getStatusUpdateSource())
                .rejectReason(orderRequest.getRejectReason())
                .attemptCount(orderRequest.getAttemptCount())
                .submittedAttemptsRemaining(orderRequest.getSubmittedAttemptsRemaining())
                .completedAttemptsRemaining(orderRequest.getCompletedAttemptsRemaining())
                .nextPollAt(orderRequest.getNextPollAt())
                .lastPolledAt(orderRequest.getLastPolledAt())
                .lockUntil(orderRequest.getLockUntil())
                .lockedBy(orderRequest.getLockedBy())
                .updateContext(orderRequest.getUpdateContext())
                .trackingId(orderRequest.getTrackingId())
                .routeEnv(orderRequest.getRouteEnv())
                .build())
            .orElse(null);
    }

    private String extractImplErrorInfo(final String messageDescription) throws Exception {
        JsonNode root = mapper.readTree(messageDescription);
        JsonNode implErrorInfo = root.path("errors").get(0)
            .path("nestedError").path("ImplErrorInfo");
        return mapper.writeValueAsString(implErrorInfo);
    }

    private CeaseLineResponse buildSuccessResponseIfOrderCompleted(String customerId, String productOfferId, Long seqNo) {
        String trackingId = ServiceContext.getString(CEASE_LINE_TRACKING_ID);
        Optional<OrderRequest> orderRequest = ceaseOrderRepo.findByCustomerIdAndProductOfferId(customerId, productOfferId);
        if (orderRequest.isPresent() && orderRequest.get().getStatus().equals(OrderStatus.SUCCESS)) {
            log.warn("Returning background process status: success for seqNo={} trackingId{}", seqNo, trackingId);
            return CeaseLineResponse.builder()
                .httpCode(200)
                .message("SUCCESS")
                .orderNo(orderRequest.get().getBillerOrderNo())
                .orderActionId(orderRequest.get().getBillerOrderActionNo())
                .build();
        }
        return null;
    }

    private IopCaseRequest buildIopCaseRequest(String customerId, String billerOrderNo, String billerOrderActionNo, String productOfferId, String bpmOrderId) {
        String trackingId = ServiceContext.getString(CEASE_LINE_TRACKING_ID);
        String uDescription =String.format(
            "Voluntary Cease order has been stuck..\n\nCustomerID:%s\nProductOfferID:%s\nBPMOrderID:%s\nOrderID:%s\nOrderActionID:%s",
            customerId, productOfferId, bpmOrderId, billerOrderNo, billerOrderActionNo
        );
        log.info("AMA case description: {} trackingID{}", uDescription, trackingId);
        String correlation = String.format("%s:%s:%s", trackingId, billerOrderNo, billerOrderActionNo);

        return IopCaseRequest.builder()
            .uOpenedFor("")
            .uAssignmentGroup("")
            .uSymptoms("-")
            .uCategory("XM - Order Management")
            .uSubcategory("Stuck Order")
            .uChannel("API")
            .uShortDescription(" Voluntary Cease - Stuck Order")
            .uDescription(uDescription)
            .uPriority("High")
            .uCorrelationId(correlation)
            .uAssignedTo(ServiceConfig.getCeaseLineAssignToUser())
            .build();
    }

    private IopCaseRequest buildIopCaseRequestWithState(
        String uNumber,
        String uAssignedTo,
        String uState
    ) {
        return IopCaseRequest.builder()
            .uNumber(uNumber)
            .uAssignedTo(uAssignedTo)
            .uState(uState)
            .build();
    }

    private CaseNumberAndState createCaseAndGetCaseNumber(IopCaseRequest request1, HttpHeaders httpHeaders) {
        String trackingId = ServiceContext.getString(CEASE_LINE_TRACKING_ID);
        String caseResponse = amaServiceConnector.callAmaService(
            HttpMethod.POST,
            ServiceConfig.getAmaApiCaseUrl(),
            request1,
            httpHeaders,
            String.class
        );

        List<Map<String, Object>> responseList;
        try {
            responseList = mapper.readValue(caseResponse, new TypeReference<List<Map<String, Object>>>() {});
        } catch (JsonProcessingException e) {
            throw new RuntimeException("Failed to parse case response", e);
        }

        if (!responseList.isEmpty()) {
            Map<String, Object> firstItem = responseList.get(0);
            if (firstItem.containsKey("result")) {
                List<ResultItem> resultItems = mapper.convertValue(firstItem.get("result"), new TypeReference<List<ResultItem>>() {});
                ResultItem firstResult = resultItems.get(0);
                log.error("Result status: {}, error: {}", firstResult.getStatus(), firstResult.getError_message());
                throw new CaseCreationException(firstResult.getError_message());
            } else {
                CaseResponse caseResponseHappy = mapper.convertValue(firstItem, CaseResponse.class);
                log.info("Case number: {}, trackingId: {}", caseResponseHappy.getCase_number(),
                    trackingId);
                return new CaseNumberAndState(caseResponseHappy.getCase_number(), caseResponseHappy.getState());
            }
        }
        throw new CaseCreationException("Empty response from AMA service");
    }

    private void fetchCaseStateAsync(IopCaseRequest requestWithState, String caseNumber, HttpHeaders httpHeaders) {
        CompletableFuture.runAsync(() -> {
            try {

                CaseNumberAndState caseNumberAndState = createCaseAndGetCaseNumber(requestWithState, httpHeaders);
                if (caseNumberAndState != null) {
                    log.info("Fetched case state for fire and forget AMA in progress {}: {}. Built request with state: {}",
                        caseNumberAndState.getCaseNumber(), caseNumberAndState.getState(), requestWithState);
                }
            } catch (Exception e) {
                log.warn("Failed to fetch case state for fire and forget for in progress scenario  {}: {}", caseNumber, e.getMessage());
            }
        });
    }

    private CeaseLineResponse buildPendingOrderResponse400(String orderId, String reason, String caseNumber) {
        String trackingId = ServiceContext.getString(CEASE_LINE_TRACKING_ID);
        String messageDesc = amaCaseEnabled && caseNumber != null
            ? reason + " Case Number: " + caseNumber
            : reason;
        log.info("Returning PENDING-ORDER response 400 for orderId: {}, messageDesc: {}, trackingId{}", orderId, messageDesc,
            trackingId);
        return CeaseLineResponse.builder()
            .httpCode(400)
            .message("PENDING-ORDER")
            .orderNo(orderId)
            .orderActionId(null)
            .messageDescription(messageDesc)
            .build();
    }

    private CeaseLineResponse buildAcceptedOrderResponse202(String orderId, final String orderActionId, String reason, String caseNumber) {
        String trackingId = ServiceContext.getString(CEASE_LINE_TRACKING_ID);
        String messageDesc = amaCaseEnabled && caseNumber != null
            ? reason + " Case Number: " + caseNumber
            : reason;
        log.info("Returning ACCEPTED response 202 for orderId: {}, messageDesc: {}, trackingId{}", orderId, messageDesc,
            trackingId);
        return CeaseLineResponse.builder()
            .httpCode(202)
            .message("ACCEPTED")
            .orderNo(orderId)
            .orderActionId(orderActionId)
            .messageDescription(messageDesc)
            .build();
    }

    private String maybeCreateAmaCase(String customerId, OrderRequest entity, String productOfferId, String bpmOrderId, HttpHeaders httpHeaders) {
        String trackingId = ServiceContext.getString(CEASE_LINE_TRACKING_ID);
        if (!amaCaseEnabled) return null;
        IopCaseRequest request = buildIopCaseRequest(customerId, entity.getBillerOrderNo(), entity.getBillerOrderActionNo(), productOfferId, bpmOrderId);
        String caseNumber = createCaseAndGetCaseNumber(request, httpHeaders).getCaseNumber();
        IopCaseRequest requestWithState = buildIopCaseRequestWithState(caseNumber, ServiceConfig.getCeaseLineAssignToUser(), "15");
        log.info("Created case in AMA for stuck cease order. Order ID: {}, Action ID: {}, Case Number: {}, trackingId{}",
            entity.getBillerOrderNo(), entity.getBillerOrderActionNo(), caseNumber, trackingId);
        // Clone headers and set tracking ID explicitly
        HttpHeaders asyncHeaders = new HttpHeaders();
        asyncHeaders.putAll(httpHeaders);
        asyncHeaders.set(CEASE_LINE_TRACKING_ID, trackingId);

        // Guard the async call
        if (fetchStateAsyncEnabled) {
            fetchCaseStateAsync(requestWithState, caseNumber, asyncHeaders);
        } else {
            log.info("Async case state fetching for in progress state is disabled for case: {}", caseNumber);
        }
        return caseNumber;
    }
Source Link
yoda
  • 277
  • 1
  • 5
Loading