4
\$\begingroup\$

We have an API in which we call external service with a tracking id.This first part of the API workflow makes an entry to the DB as SUBMITTED after the call to first external API say /E1 finishes and says the order is in process.

Simultaneously a scheduler runs every 2 seconds and calls the row entry just saved as SUBMITTED and calls another API /E2 and marks the status as SUCCESS if the status is CLOSED from API.

There is poller logic as follows. There is SQS listener also part of the equation but I havent posted it here it also closes the order.

Following is the code it is very difficult to reason about it. We have a submit method and another schedule with the same method. Retry happens 3 times based on property.

This code runs on multi site multi instance on premise setup so running it with some local memory instance maps seem sus to me.

Properties: poll.plan.submitted=4,2,2 queue.initial-delay-seconds=4 worker.lock-ttl-seconds=1 worker.batch-size=50 worker.fixed-delay-ms=2000

PollWorker

@Slf4j
@RefreshScope
public class PollWorker {

enter code here
    private final PollingDao pollingDao;
    private final OrderRequestRepository ceaseOrderRepo;
    private final PollNotifier pollNotifier;
    private final PollService pollService;
    @Autowired
    private ServiceConfiguration config;

    @Value("${.worker.batch-size}")
    private int batch;

    @Value("${.worker.lock-ttl-seconds}")
    private int lockTtl;

    @Value("${.worker.instance-id}")
    private String instanceId;

    @Autowired
    @Qualifier("db.polling.executor")
    private ScheduledExecutorService executor;

    @Scheduled(fixedDelayString = "${.worker.fixed-delay-ms}")
    public void pollPendingRows() {
        if (config.getBoolean(".worker.polling.enabled", false)) {
            String now = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"));
            log.info("Polling at {} for pending rows with batch size {} and lock TTL {} seconds", now, batch, lockTtl);
            List<Long> ids = pollingDao.claimPendingRows(batch, instanceId, lockTtl);
            for (Long id : ids) {
                startPoll(id);
            }
        }
    }

    public void startPoll(Long seqNo) {
        executor.submit(() -> doPoll(seqNo, 0));
    }

    private void doPoll(Long seqNo, int planIndex) {
        try {
            log.info("Polling seqNo={} at planIndex={}", seqNo, planIndex);
            pollService.handlePoll(seqNo);

            int nextDelay = pollService.getNextDelay(seqNo, planIndex);
            if (nextDelay > 0) {
                executor.schedule(() -> doPoll(seqNo, planIndex + 1), nextDelay, TimeUnit.SECONDS);
            } else {
                OrderRequest entity = ceaseOrderRepo.findBySeqNo(seqNo).orElse(null);
                CeaseLineResponse result = CeaseLineResponse.builder()
                    .httpCode(200)
                    .message("SUCCESS")
                    .orderNo(entity != null ? entity.getBillerOrderNo() : null)
                    .orderActionId(entity != null ? entity.getBillerOrderActionNo() : null)
                    .messageDescription("Order processed successfully")
                    .build();

                pollNotifier.complete(seqNo, result);

            }
        } catch (Exception ex) {
            log.error("Polling error on seqNo {}: {}", seqNo, ex.toString());
            pollNotifier.completeExceptionally(seqNo, ex);
        }
    }
}

PollNotifier

@Component
@Slf4j
public class PollNotifier {

    private final Map<Long, CompletableFuture<CeaseLineResponse>> pending = new ConcurrentHashMap<>();

    public CompletableFuture<CeaseLineResponse> register(Long seqNo) {
        CompletableFuture<CeaseLineResponse> future = new CompletableFuture<>();
        pending.put(seqNo, future);
        return future;
    }

    public void complete(Long seqNo, CeaseLineResponse response) {
        CompletableFuture<CeaseLineResponse> future = pending.remove(seqNo);
        if (future != null) {
            future.complete(response);
        } else {
            log.warn("No future found for seqNo {}", seqNo);
        }
    }

    public void completeExceptionally(Long seqNo, Throwable ex) {
        CompletableFuture<CeaseLineResponse> future = pending.remove(seqNo);
        if (future != null) {
            future.completeExceptionally(ex);
        }
    }

}

PollPlans

@RefreshScope
@Getter
public class PollPlans {
    private final int[] completedPlan;
    private final int[] submittedPlan;
    private final int initialDelaySeconds;

    public PollPlans(
        @Value("${.poll.plan.completed}") String completed,
        @Value("${.poll.plan.submitted}") String submitted,
        @Value("${.queue.initial-delay-seconds}") int initialDelaySeconds) {
        this.completedPlan = Arrays.stream(completed.split(",")).map(String::trim).mapToInt(Integer::parseInt).toArray();
        this.submittedPlan = Arrays.stream(submitted.split(",")).map(String::trim).mapToInt(Integer::parseInt).toArray();
        this.initialDelaySeconds = initialDelaySeconds;
    }
}

PollService

@RequiredArgsConstructor
@Slf4j
public class PollService {

    private final OrderRequestRepository ceaseOrderRepo;
    private final AuditService auditService;
    private final OrderProcessor ;
    private final PollPlans plans;
    private final PollNotifier pollNotifier;
    private final PollingDao pollingDao;

    @Value("${.poll.plan.completed}") String completedPlan;
    @Value("${.poll.plan.submitted}") String submittedPlan;

    @Transactional
    public void handlePoll(Long seqNo) {
        final OrderRequest orderRequest = ceaseOrderRepo.findById(seqNo).orElseThrow();

        if (orderRequest.getStatus() == OrderStatus.SUBMITTED) {
            log.info("Inside handlePoll for seqNo={} trackingId{}", seqNo, orderRequest.getTrackingId());
            if (orderRequest.getBillerOrderNo() == null) {
                final OrderStatusAudit audit = auditService.audit(seqNo, "POLL_SKIP", "No billerOrderNo");
                if (audit != null) log.info("BillerOrderNo null from DB. Audit: {} trackingId={}", audit, orderRequest.getTrackingId());
                return;
            }
            try {
                OrderStatusResponse orderStatus = .getCeaseOrderStatus(orderRequest.getCustomerId(), orderRequest.getBillerOrderNo()
                    , orderRequest.getRouteEnv(), orderRequest.getSc(), orderRequest.getTrackingId());

                String status = orderStatus != null && orderStatus.getMessage() != null ? orderStatus.getMessage().getStatus() : null;
                String id = orderStatus != null && orderStatus.getMessage() != null ? orderStatus.getMessage().getId() : null;

                final OrderStatusAudit audit = auditService.audit(seqNo, "API_GET", " getOrderStatus OK: orderId=" + id + " status=" + status);
                if (audit != null) log.info("After getOrderStatus from . Audit: {} trackingId={}", audit, orderRequest.getTrackingId());

                if (id != null && !id.equals(orderRequest.getBillerOrderNo())) {
                    final OrderStatusAudit pollWarn = auditService.audit(seqNo, "POLL_WARN", "ID mismatch expected=" + orderRequest.getBillerOrderNo() + " got=" + id);
                    if (pollWarn != null) log.info("ID mismatch expected. Audit: {} trackingId={}", pollWarn, orderRequest.getTrackingId());
                }

                if ("CLOSED".equalsIgnoreCase(status)) {
                    orderRequest.setStatus(OrderStatus.SUCCESS);
                    orderRequest.setStatusUpdateDate(Instant.now());
                    orderRequest.setStatusUpdateSource("API");
                    orderRequest.setNextPollAt(null);
                    orderRequest.setLockUntil(null);
                    orderRequest.setLockedBy(null);
                    orderRequest.setSubmittedAttemptsRemaining(orderRequest.getSubmittedAttemptsRemaining() - 1);
                    orderRequest.setAttemptCount(orderRequest.getAttemptCount() + 1);
                    final OrderRequest save = ceaseOrderRepo.save(orderRequest);
                    log.info("Obtained CLOSED status. Updated row: {} trackingId={}", save, orderRequest.getTrackingId());
                    final OrderStatusAudit audit1 = auditService.audit(seqNo, "UPDATE", "COMPLETED via API");
                    if (audit1 != null) log.info("Obtained CLOSED status. Audit: {} trackingId={}", audit1, orderRequest.getTrackingId());

                    pollNotifier.complete(seqNo, CeaseLineResponse.builder()
                        .httpCode(200)
                        .message("SUCCESS")
                        .orderNo(orderRequest.getBillerOrderNo())
                        .orderActionId(orderRequest.getBillerOrderActionNo())
                        .build());
                } else {
                    int idx = plans.getSubmittedPlan().length - orderRequest.getSubmittedAttemptsRemaining() + 1;
                    int nextSec = (idx < plans.getSubmittedPlan().length) ? plans.getSubmittedPlan()[idx] : 0;

                    if (orderRequest.getSubmittedAttemptsRemaining() > 1) {
                        int rows = pollingDao.decrementSubmittedRemaining(seqNo, nextSec);
                        final OrderStatusAudit update = auditService.audit(seqNo, "UPDATE", "SUBMITTED still pending; rows=" + rows + "; next in " + nextSec + "s");
                        if (update != null) log.info("SUBMITTED status. Audit: {} trackingId={}", update, orderRequest.getTrackingId());
                    } else {
                        orderRequest.setNextPollAt(null);
                        orderRequest.setLockUntil(null);
                        orderRequest.setLockedBy(null);
                        orderRequest.setSubmittedAttemptsRemaining(orderRequest.getSubmittedAttemptsRemaining() - 1);
                        orderRequest.setAttemptCount(orderRequest.getAttemptCount() + 1);
                        final OrderRequest save = ceaseOrderRepo.save(orderRequest);
                        log.info("SUBMITTED (polling finished). Updated row: {} trackingId={}", save, orderRequest.getTrackingId());
                        final OrderStatusAudit update = auditService.audit(seqNo, "UPDATE", "SUBMITTED (polling finished)");
                        if (update != null) log.info("SUBMITTED (polling finished). Audit: {} trackingId={}", update, orderRequest.getTrackingId());

                        pollNotifier.complete(seqNo, CeaseLineResponse.builder()
                            .httpCode(202)
                            .message("ACCEPTED")
                            .orderNo(orderRequest.getBillerOrderNo())
                            .orderActionId(orderRequest.getBillerOrderActionNo())
                            .messageDescription("Order created but still processing")
                            .build());
                    }
                }
            } catch (Exception ex) {
                orderRequest.setStatus(OrderStatus.FAILED);
                orderRequest.setRejectReason("Poll error: " + ex.getMessage());
                orderRequest.setNextPollAt(null);
                orderRequest.setLockUntil(null);
                orderRequest.setLockedBy(null);
                final OrderRequest save = ceaseOrderRepo.save(orderRequest);
                log.info("Obtained FAILED status. Updated row: {} trackingId{}", save, orderRequest.getTrackingId());
                final OrderStatusAudit update = auditService.audit(seqNo, "UPDATE", "FAILED on poll: " + ex.getMessage());
                if (update != null) log.info("Obtained FAILED status. Audit: {} trackingId={}", update, orderRequest.getTrackingId());

                pollNotifier.complete(seqNo, CeaseLineResponse.builder()
                    .httpCode(500)
                    .message("FAILED")
                    .orderNo(orderRequest.getBillerOrderNo())
                    .orderActionId(orderRequest.getBillerOrderActionNo())
                    .messageDescription(orderRequest.getRejectReason())
                    .build());
            }
        }
    }

    public int getNextDelay(Long seqNo, int planIndex) {
        List<Integer> plan = getPlanForSeqNo(seqNo);
        if (planIndex < plan.size() - 1) {
            return plan.get(planIndex + 1);
        }
        return 0;
    }

    private List<Integer> getPlanForSeqNo(Long seqNo) {
        Optional<OrderRequest> optEntity = ceaseOrderRepo.findById(seqNo);
        if (optEntity.isEmpty()) {
            log.warn("No OrderRequest found for seqNo={}", seqNo);
            return Collections.emptyList();
        }

        OrderRequest entity = optEntity.get();

        String planStr;
        if (entity.getStatus() == OrderStatus.SUBMITTED) planStr = submittedPlan;
        else if (entity.getStatus() == OrderStatus.COMPLETED) planStr = completedPlan;
        else planStr = submittedPlan;

        return Arrays.stream(planStr.split(","))
            .map(String::trim)
            .map(Integer::parseInt)
            .collect(Collectors.toList());
    }


}

PollingDao

@RequiredArgsConstructor
@Slf4j
public class PollingDao {

    private final JdbcTemplate jdbc;

    @Transactional
    public List<Long> claimPendingRows(int batch, String instanceId, int lockTtlSeconds) {
        String sel = ""
            + "SELECT seq_no FROM order_requests WHERE status in ('SUBMITTED') AND next_poll_at <= CURRENT_TIMESTAMP LIMIT ? FOR UPDATE SKIP LOCKED";

        List<Long> ids = jdbc.query(sel, ps -> ps.setInt(1, batch),
            (ResultSet rs, int rowNum) -> rs.getLong(1));

        if (ids.isEmpty()) return ids;

        StringBuilder placeholders = new StringBuilder();
        for (int i=0;i<ids.size();i++){ if(i>0) placeholders.append(","); placeholders.append("?"); }

        String lockUpdate = ""
            + "UPDATE order_requests "
            + "SET lock_until = DATE_ADD(UTC_TIMESTAMP(), INTERVAL ? SECOND), locked_by = ?, last_polled_at = UTC_TIMESTAMP() "
            + "WHERE seq_no IN (" + placeholders + ")";

        jdbc.update(con -> {
            final PreparedStatement ps = con.prepareStatement(lockUpdate);
            int idx = 1;
            ps.setInt(idx++, lockTtlSeconds);
            ps.setString(idx++, instanceId);
            for (Long id : ids) ps.setLong(idx++, id);
            return ps;
        });

        log.info("Claimed rows {} by {}", ids, instanceId);
        return ids;
    }

    public int decrementSubmittedRemaining(Long seqNo, int secondsToNext) {
        String sql = "UPDATE order_requests SET submitted_attempts_remaining=submitted_attempts_remaining-1, "
            + " next_poll_at = DATE_ADD(UTC_TIMESTAMP(), INTERVAL ? SECOND), "
            + "attempt_count = attempt_count + 1 "
            + "WHERE seq_no=? AND submitted_attempts_remaining>0";
        return jdbc.update(sql, secondsToNext, seqNo);
    }
}

Config

@Bean(name = "db.polling.executor")
public ScheduledExecutorService executor() {
    ThreadFactory threadFactory = new ThreadFactory() {
        final AtomicLong count = new AtomicLong(1);

        @Override
        public Thread newThread(Runnable r) {
            Thread thread = new Thread(r);
            thread.setName("DB-polling-Thread-" + count.getAndIncrement());
            thread.setPriority(Thread.MAX_PRIORITY);
            return thread;
        }
    };
    RejectedExecutionHandler rejectionHandler = (runnable, executor) -> LOGGER.info(
        runnable.toString() + " Subscription queue is full. Active Threads are: {} and work queue length is {} ",
        executor.getActiveCount(),
        executor.getQueue().size()
    );

    ScheduledThreadPoolExecutor scheduledExecutor = new ScheduledThreadPoolExecutor(
        Integer.parseInt(config.getString(DB_POLLING_THREAD_COREPOOL_SIZE, "20")),
        threadFactory,
        rejectionHandler
    );
    scheduledExecutor.setMaximumPoolSize(Integer.parseInt(config.getString(DB_POLLING_THREAD_MAXPOOL_SIZE, "50")));
    scheduledExecutor.setKeepAliveTime(Long.parseLong(config.getString(DB_POLLING_THREAD_TIMETOLIVE, "300000")), TimeUnit.MILLISECONDS);
    scheduledExecutor.setRemoveOnCancelPolicy(true);
    return scheduledExecutor;
}

OrderRequestRepository

@Repository
public interface OrderRequestRepository extends JpaRepository<OrderRequest, Long> {

    Optional<OrderRequest> findBySeqNo(Long seqNo);

    Optional<OrderRequest> findByCustomerIdAndProductOfferId(String customerId, String productOfferId);

    @Modifying
    @Query("update OrderRequest o set o.lockUntil = :lockUntil, o.lockedBy = :lockedBy where o.seqNo = :seqNo")
    int lockRow(Long seqNo, Instant lockUntil, String lockedBy);

    @Modifying
    @Query("update OrderRequest o set o.status = :status, o.statusUpdateDate = :updateDate, o.statusUpdateSource=:source where o.seqNo = :seqNo")
    int updateStatus(Long seqNo, OrderStatus status, Instant updateDate, String source);

    Optional<OrderRequest> findByBillerOrderNoAndBillerOrderActionNo(String billerOrderNo, String billerOrderActionNo);
} 

Controller

@PreAuthorize("#oauth2.hasScope('::ceaseorder')")
@PostMapping("/cease/ceaseline")
public ResponseEntity<CeaseLineResponse> createCeaseOrder(
   
    @RequestHeader("customerId") String customerId,
    @RequestParam(value = Constants.SALES_CHANNEL, required = false, defaultValue = Constants.SALES_CHANNEL_DEFAULT) String sc,
    @RequestParam(value = Constants.QUERY_PARAM_LOCALE, required = false,defaultValue = Constants.LOCALE_DEFAULT) String lo,
    @Valid @RequestBody CreateAndSubmitSuspendResumeCeaseOrderRequest request) {
    if ((routeEnv == null || routeEnv.isEmpty())) {
        LOGGER.info("routeEnv is : {}",routeEnv);
        ServiceContext.put(CEASE_LINE_ROUTE_ENV, DefaultEnv);
    }else{
        LOGGER.info("routeEnv is : {}",routeEnv);
        ServiceContext.put(CEASE_LINE_ROUTE_ENV, routeEnv);
    }
    ServiceContext.put(CEASE_LINE_TRACKING_ID, trackingId);
    ServiceContext.put(KEY_API_NAME, "ceaseline");
    ServiceContext.put(SALES_CHANNEL, sc);
    CeaseLineResponse resp = service.createCeaseOrder(customerId, request, sc, lo);
    return ResponseEntity.status(resp.getHttpCode()).body(resp);
}

OrderService

public class OrderService {

    private final OrderRequestRepository ceaseOrderRepo;
    private final AuditService auditService;
    private final OrderProcessor ;
    private final PollPlans plans;
    private final PollNotifier pollNotifier;

    @Value("${app.case.fetch-state-async.enabled:false}")
    private boolean fetchStateAsyncEnabled;

    @Value("${cease.future.timeout-seconds:10}")
    private int futureTimeoutSeconds;
    private final OrderRequestService orderRequestService;
    @Autowired
    private ServiceConfiguration config;
    @Autowired
    private ObjectMapper mapper;

    @Autowired
    private AmaServiceConnector amaServiceConnector;

    @Autowired
    private ServiceConfig ServiceConfig;

    @Value("${ama.case.enabled:false}")
    private boolean amaCaseEnabled;

    public CeaseLineResponse createCeaseOrder(String customerId, CreateAndSubmitSuspendResumeCeaseOrderRequest req,String sc,String lo) {
        String trackingId = ServiceContext.getString(CEASE_LINE_TRACKING_ID);
        HttpHeaders httpHeaders = new HttpHeaders();
        httpHeaders.add(CEASE_LINE_TRACKING_ID,trackingId);
        httpHeaders.add(CEASE_LINE_ROUTE_ENV, ServiceContext.getString(CEASE_LINE_ROUTE_ENV));

        final CreateAndSubmitSuspendResumeCeaseOrderRequest.Payload payload = req.getPayload();
        final CreateAndSubmitSuspendResumeCeaseOrderRequest.RequestProductOfferDetail pod = payload.getRequestProductOfferDetails().get(0);
        String productOfferId = pod.getProductOfferID();
        OrderRequest entity = null;
        try {
            CeaseOrderResponse cor = .createCeaseOrder(customerId, req, sc, lo);

            String productOrderId = null;
            String productOrderActionIdx9 = null;

            if (cor != null
                && cor.getOutput() != null
                && cor.getOutput().getResponseProductOfferDetail() != null
                && !cor.getOutput().getResponseProductOfferDetail().isEmpty()
                && cor.getOutput().getResponseProductOfferDetail().get(0) != null) {
                productOrderId = cor.getOutput().getResponseProductOfferDetail().get(0).getProductOrderId();
                productOrderActionIdx9 = cor.getOutput().getResponseProductOfferDetail().get(0).getProductOrderActionIdx9();
            }

            entity = orderRequestService.persistNewOrderIfAbsent(customerId, productOfferId, payload, plans, StringUtils.isNotBlank(productOrderId) ? productOrderId : null, StringUtils.isNotBlank(productOrderActionIdx9) ? productOrderActionIdx9 : null);

            if (cor == null || cor.getOutput() == null || cor.getOutput().getResponseProductOfferDetail() == null || cor.getOutput().getResponseProductOfferDetail().isEmpty()) {
                if (null != entity) orderRequestService.markRejected(entity, "Empty response from ");
                return CeaseLineResponse.builder().httpCode(500).message("FAILED").orderNo(null).orderActionId(null).build();
            } else
                log.info("Obtained ceaseOrderResponse response from  for productOfferId: {}, trackingId: {},sc:{}", productOfferId, trackingId, sc);

            final List<CeaseOrderResponse.ResponseProductOfferDetail> productOfferDetail = cor.getOutput().getResponseProductOfferDetail();
            final CeaseOrderResponse.ResponseProductOfferDetail responseProductOfferDetail = productOfferDetail.get(0);

            final String errorCode = responseProductOfferDetail.getErrorCode();
            final String reason = responseProductOfferDetail.getReason();

            if (StringUtils.isNotBlank(errorCode) && StringUtils.isNotBlank(reason)) {
                // UC - 4
                if ("CEASED_PRODUCT".equalsIgnoreCase(errorCode) && reason.contains("Cease cannot be initiated on Ceased product")) {
                    log.info("Cease on Cease from DB. orderId={}, actionId={} trackingID={}", entity.getBillerOrderNo(), entity.getBillerOrderActionNo(), trackingId);
                    return CeaseLineResponse.builder().httpCode(200).message("SUCCESS").orderNo(entity.getBillerOrderNo()).orderActionId(entity.getBillerOrderActionNo()).messageDescription(config.getString("cease.on.cease.message.description")).build();
                }

                // UC - 2
                if ("3009".equalsIgnoreCase(errorCode) && reason.contains("Existing Pending Orders exist on the subscriber")) {
                    Matcher matcher = Pattern.compile("(?:Suspend Order Id|Order Id)\\s*:\\s*(\\d+)").matcher(reason);
                    String orderId = null;
                    if (matcher.find()) orderId = matcher.group(1);
                    log.info("OrderId obtained from existing pending order from  response. orderId={}", orderId);
                    // call AMA to create case
                    log.info("Creating case in AMA for stuck cease order. Order ID: {}, Action ID: {}, trackingId{}", entity.getBillerOrderNo(),
                        entity.getBillerOrderActionNo(), trackingId);
                    String bpmOrderId = req.getPayload().getBpmOrderInputX9() != null
                        ? req.getPayload().getBpmOrderInputX9().getBpmOrderIdx9()
                        : null;
                    String caseNumber = maybeCreateAmaCase(customerId, entity, productOfferId, bpmOrderId, httpHeaders);
                    return buildPendingOrderResponse400(orderId, reason, caseNumber);
                }
                //new uc
                if (("92000167".equalsIgnoreCase(errorCode) && reason != null && reason.contains("Pending order"))) {
                    Matcher matcher = Pattern.compile("Pending order (\\d+) found with reason (\\w+) that is not allowed to be cancelled").matcher(reason);
                    String orderId = null;
                    if (matcher.find()) orderId = matcher.group(1);
                    log.info("OrderId obtained from existing pending order from  response. orderId={}", orderId);
                    // call AMA to create case
                    log.info("Creating case in AMA for stuck cease order. Order ID: {}, Action ID: {}, trackingId{}", entity.getBillerOrderNo(),
                        entity.getBillerOrderActionNo(), trackingId);
                    String bpmOrderId = req.getPayload().getBpmOrderInputX9() != null
                        ? req.getPayload().getBpmOrderInputX9().getBpmOrderIdx9()
                        : null;
                    String caseNumber = maybeCreateAmaCase(customerId, entity, productOfferId, bpmOrderId, httpHeaders);
                    return buildPendingOrderResponse400(orderId, reason, caseNumber);
                }
                // UC - 3
                if ("CEASED_PRODUCT".equalsIgnoreCase(errorCode) && reason.contains("Pending OA ID")) {
                    Matcher matcher = Pattern.compile("Pending OA ID\\s*:\\s*(\\d+)").matcher(reason);
                    String oaId = null;
                    if (matcher.find()) oaId = matcher.group(1);
                    log.info("Pending OA ID from  response: {}", oaId);
                    log.info("Cease on Pending OA from DB. Order ID: {}, Action ID: {}, trackingId: {}", entity.getBillerOrderNo(), entity.getBillerOrderActionNo(), trackingId);
                    log.info("OrderActionId match check - : {}, DB: {}", oaId, entity.getBillerOrderActionNo());
                    return CeaseLineResponse.builder().httpCode(200).message("SUCCESS").orderNo(entity.getBillerOrderNo()).orderActionId(oaId).messageDescription(reason).build();
                }

                // US - 6, other failures
                if (("3009".equalsIgnoreCase(errorCode) && reason.contains("Invalid reason code")) || (("2000103".equalsIgnoreCase(errorCode) && reason.contains("Product with provided identifier could not be found")))) {
                    log.info("Bad request from . errorCode={}, reason={}, trackingId={}", errorCode, reason, trackingId);
                    return CeaseLineResponse.builder().httpCode(400).message("BAD-REQUEST").orderNo(null).orderActionId(null).messageDescription(reason).build();
                }

                // US - 6, other failures - with no reason code check
                if (StringUtils.isNotBlank(errorCode)) {
                    log.info("Biller error from . errorCode={}, reason={}, trackingId={}", errorCode, reason, trackingId);
                    return CeaseLineResponse.builder().httpCode(500).message("BILLER-ERROR").orderNo(null).orderActionId(null).messageDescription(reason).build();
                }
            }

            // UC - 1 & #5
            final String productOrderReferenceNumber = responseProductOfferDetail.getProductOrderReferenceNumber();
            if (StringUtils.isNotBlank(productOrderReferenceNumber) && entity != null) {
                entity.setStatus(OrderStatus.SUBMITTED);
                entity.setBillerOrderNo(responseProductOfferDetail.getProductOrderId());
                entity.setBillerOrderActionNo(responseProductOfferDetail.getProductOrderActionIdx9());
                entity.setBillerOrderCreationTime(Instant.now());
                entity.setRequestResponseTime(Instant.now());
                entity.setAttemptCount(entity.getAttemptCount());
                final OrderRequest save = ceaseOrderRepo.save(entity);
                log.info("Cease acknowledged by , productOrderReferenceNumber={}. Updated row: {}, trackingId: {}", productOrderReferenceNumber, save, trackingId);
                final OrderStatusAudit audit = auditService.audit(entity.getSeqNo(), "API_POST", " createCeaseOrder OK: order=" + responseProductOfferDetail.getProductOrderId());
                if (audit != null)
                    log.info("Cease acknowledged by , productOrderReferenceNumber={}. Audit: {}. TrackingID: {}", productOrderReferenceNumber, audit, trackingId);
            }

            if (entity != null && entity.getSeqNo() != null) {
                try {
                    log.info("Waiting for polling result for seqNo={} trackingId={}", entity.getSeqNo(), trackingId);
                    CompletableFuture<CeaseLineResponse> future = pollNotifier.register(entity.getSeqNo());
                    return future.get(futureTimeoutSeconds, TimeUnit.SECONDS);
                } catch (TimeoutException te) {
                    log.warn("Polling did not complete in time for seqNo={} trackingId{}", entity.getSeqNo(), trackingId);
                    CeaseLineResponse response = buildSuccessResponseIfOrderCompleted(customerId, productOfferId, entity.getSeqNo());
                    if (response != null) return response;

                    // call AMA to create case
                    log.info("Creating case in AMA for stuck cease order. Order ID: {}, Action ID: {}, trackingId: {}", entity.getBillerOrderNo(), entity.getBillerOrderActionNo(),
                        trackingId);
                    String bpmOrderId = req.getPayload().getBpmOrderInputX9() != null
                        ? req.getPayload().getBpmOrderInputX9().getBpmOrderIdx9()
                        : null;
                    String caseNumber = maybeCreateAmaCase(customerId, entity, productOfferId, bpmOrderId, httpHeaders);
                    return buildAcceptedOrderResponse202(entity.getBillerOrderNo(), entity.getBillerOrderActionNo(), "Order created but still processing", caseNumber);
                } catch (Exception e) {
                    log.error("Error while waiting for poll result", e);
                    CeaseLineResponse response = buildSuccessResponseIfOrderCompleted(customerId, productOfferId, entity.getSeqNo());
                    if (response != null) return response;

                    return CeaseLineResponse.builder()
                        .httpCode(500)
                        .message("FAILED")
                        .orderNo(entity.getBillerOrderNo())
                        .orderActionId(entity.getBillerOrderActionNo())
                        .build();
                }
            }
        } catch (Exception ex) {
            String errorDetailMessage = null;
            if (ex instanceof ErrorResponseFromExternalException) {
                Throwable causedBy = ((ErrorResponseFromExternalException) ex).getCausedBy();
                if (causedBy instanceof HttpServerErrorException.InternalServerError) {
                    errorDetailMessage = ((HttpServerErrorException.InternalServerError) causedBy).getResponseBodyAsString();
                }
            }
            String errorMessage = null;
            if (null != errorDetailMessage) {
                try {
                    errorMessage = extractImplErrorInfo(errorDetailMessage);
                    log.error("Failed to create cease order for productOfferId: {}, message={}, trackingId{}", productOfferId, errorMessage,
                        trackingId);
                    if (null != entity) {
                        String errorMsg = " create error: " + (errorMessage != null ? errorMessage : ex.getMessage());
                        orderRequestService.markRejected(entity, errorMsg);
                    }
                } catch (Exception e) {
                    log.error("Failed to extractImplErrorInfo, message={}, exception={} trackingId{}", e.getMessage(), e, trackingId);
                }
            }
            return CeaseLineResponse.builder().httpCode(500).message("FAILED").orderNo(null).orderActionId(null).messageDescription(errorMessage).build();
        }
        return CeaseLineResponse.builder().httpCode(500).message("FAILED").build();
    }

    public OrderStatusView getStatus(String customerId, String productOfferId) {
        return ceaseOrderRepo.findByCustomerIdAndProductOfferId(customerId, productOfferId)
            .map(orderRequest -> OrderStatusView.builder()
                .seqNo(orderRequest.getSeqNo())
                .bpmOrderId(orderRequest.getBpmOrderId())
                .lineId(orderRequest.getLineId())
                .customerId(orderRequest.getCustomerId())
                .productOfferId(orderRequest.getProductOfferId())
                .reqCreateTime(orderRequest.getReqCreateTime())
                .requestResponseTime(orderRequest.getRequestResponseTime())
                .status(orderRequest.getStatus().name())
                .billerOrderCreationTime(orderRequest.getBillerOrderCreationTime())
                .billerOrderNo(orderRequest.getBillerOrderNo())
                .billerOrderActionNo(orderRequest.getBillerOrderActionNo())
                .statusUpdateDate(orderRequest.getStatusUpdateDate())
                .statusUpdateSource(orderRequest.getStatusUpdateSource())
                .rejectReason(orderRequest.getRejectReason())
                .attemptCount(orderRequest.getAttemptCount())
                .submittedAttemptsRemaining(orderRequest.getSubmittedAttemptsRemaining())
                .completedAttemptsRemaining(orderRequest.getCompletedAttemptsRemaining())
                .nextPollAt(orderRequest.getNextPollAt())
                .lastPolledAt(orderRequest.getLastPolledAt())
                .lockUntil(orderRequest.getLockUntil())
                .lockedBy(orderRequest.getLockedBy())
                .updateContext(orderRequest.getUpdateContext())
                .trackingId(orderRequest.getTrackingId())
                .routeEnv(orderRequest.getRouteEnv())
                .build())
            .orElse(null);
    }

    private String extractImplErrorInfo(final String messageDescription) throws Exception {
        JsonNode root = mapper.readTree(messageDescription);
        JsonNode implErrorInfo = root.path("errors").get(0)
            .path("nestedError").path("ImplErrorInfo");
        return mapper.writeValueAsString(implErrorInfo);
    }

    private CeaseLineResponse buildSuccessResponseIfOrderCompleted(String customerId, String productOfferId, Long seqNo) {
        String trackingId = ServiceContext.getString(CEASE_LINE_TRACKING_ID);
        Optional<OrderRequest> orderRequest = ceaseOrderRepo.findByCustomerIdAndProductOfferId(customerId, productOfferId);
        if (orderRequest.isPresent() && orderRequest.get().getStatus().equals(OrderStatus.SUCCESS)) {
            log.warn("Returning background process status: success for seqNo={} trackingId{}", seqNo, trackingId);
            return CeaseLineResponse.builder()
                .httpCode(200)
                .message("SUCCESS")
                .orderNo(orderRequest.get().getBillerOrderNo())
                .orderActionId(orderRequest.get().getBillerOrderActionNo())
                .build();
        }
        return null;
    }

    private IopCaseRequest buildIopCaseRequest(String customerId, String billerOrderNo, String billerOrderActionNo, String productOfferId, String bpmOrderId) {
        String trackingId = ServiceContext.getString(CEASE_LINE_TRACKING_ID);
        String uDescription =String.format(
            "Voluntary Cease order has been stuck..\n\nCustomerID:%s\nProductOfferID:%s\nBPMOrderID:%s\nOrderID:%s\nOrderActionID:%s",
            customerId, productOfferId, bpmOrderId, billerOrderNo, billerOrderActionNo
        );
        log.info("AMA case description: {} trackingID{}", uDescription, trackingId);
        String correlation = String.format("%s:%s:%s", trackingId, billerOrderNo, billerOrderActionNo);

        return IopCaseRequest.builder()
            .uOpenedFor("")
            .uAssignmentGroup("")
            .uSymptoms("-")
            .uCategory("XM - Order Management")
            .uSubcategory("Stuck Order")
            .uChannel("API")
            .uShortDescription(" Voluntary Cease - Stuck Order")
            .uDescription(uDescription)
            .uPriority("High")
            .uCorrelationId(correlation)
            .uAssignedTo(ServiceConfig.getCeaseLineAssignToUser())
            .build();
    }

    private IopCaseRequest buildIopCaseRequestWithState(
        String uNumber,
        String uAssignedTo,
        String uState
    ) {
        return IopCaseRequest.builder()
            .uNumber(uNumber)
            .uAssignedTo(uAssignedTo)
            .uState(uState)
            .build();
    }

    private CaseNumberAndState createCaseAndGetCaseNumber(IopCaseRequest request1, HttpHeaders httpHeaders) {
        String trackingId = ServiceContext.getString(CEASE_LINE_TRACKING_ID);
        String caseResponse = amaServiceConnector.callAmaService(
            HttpMethod.POST,
            ServiceConfig.getAmaApiCaseUrl(),
            request1,
            httpHeaders,
            String.class
        );

        List<Map<String, Object>> responseList;
        try {
            responseList = mapper.readValue(caseResponse, new TypeReference<List<Map<String, Object>>>() {});
        } catch (JsonProcessingException e) {
            throw new RuntimeException("Failed to parse case response", e);
        }

        if (!responseList.isEmpty()) {
            Map<String, Object> firstItem = responseList.get(0);
            if (firstItem.containsKey("result")) {
                List<ResultItem> resultItems = mapper.convertValue(firstItem.get("result"), new TypeReference<List<ResultItem>>() {});
                ResultItem firstResult = resultItems.get(0);
                log.error("Result status: {}, error: {}", firstResult.getStatus(), firstResult.getError_message());
                throw new CaseCreationException(firstResult.getError_message());
            } else {
                CaseResponse caseResponseHappy = mapper.convertValue(firstItem, CaseResponse.class);
                log.info("Case number: {}, trackingId: {}", caseResponseHappy.getCase_number(),
                    trackingId);
                return new CaseNumberAndState(caseResponseHappy.getCase_number(), caseResponseHappy.getState());
            }
        }
        throw new CaseCreationException("Empty response from AMA service");
    }

    private void fetchCaseStateAsync(IopCaseRequest requestWithState, String caseNumber, HttpHeaders httpHeaders) {
        CompletableFuture.runAsync(() -> {
            try {

                CaseNumberAndState caseNumberAndState = createCaseAndGetCaseNumber(requestWithState, httpHeaders);
                if (caseNumberAndState != null) {
                    log.info("Fetched case state for fire and forget AMA in progress {}: {}. Built request with state: {}",
                        caseNumberAndState.getCaseNumber(), caseNumberAndState.getState(), requestWithState);
                }
            } catch (Exception e) {
                log.warn("Failed to fetch case state for fire and forget for in progress scenario  {}: {}", caseNumber, e.getMessage());
            }
        });
    }

    private CeaseLineResponse buildPendingOrderResponse400(String orderId, String reason, String caseNumber) {
        String trackingId = ServiceContext.getString(CEASE_LINE_TRACKING_ID);
        String messageDesc = amaCaseEnabled && caseNumber != null
            ? reason + " Case Number: " + caseNumber
            : reason;
        log.info("Returning PENDING-ORDER response 400 for orderId: {}, messageDesc: {}, trackingId{}", orderId, messageDesc,
            trackingId);
        return CeaseLineResponse.builder()
            .httpCode(400)
            .message("PENDING-ORDER")
            .orderNo(orderId)
            .orderActionId(null)
            .messageDescription(messageDesc)
            .build();
    }

    private CeaseLineResponse buildAcceptedOrderResponse202(String orderId, final String orderActionId, String reason, String caseNumber) {
        String trackingId = ServiceContext.getString(CEASE_LINE_TRACKING_ID);
        String messageDesc = amaCaseEnabled && caseNumber != null
            ? reason + " Case Number: " + caseNumber
            : reason;
        log.info("Returning ACCEPTED response 202 for orderId: {}, messageDesc: {}, trackingId{}", orderId, messageDesc,
            trackingId);
        return CeaseLineResponse.builder()
            .httpCode(202)
            .message("ACCEPTED")
            .orderNo(orderId)
            .orderActionId(orderActionId)
            .messageDescription(messageDesc)
            .build();
    }

    private String maybeCreateAmaCase(String customerId, OrderRequest entity, String productOfferId, String bpmOrderId, HttpHeaders httpHeaders) {
        String trackingId = ServiceContext.getString(CEASE_LINE_TRACKING_ID);
        if (!amaCaseEnabled) return null;
        IopCaseRequest request = buildIopCaseRequest(customerId, entity.getBillerOrderNo(), entity.getBillerOrderActionNo(), productOfferId, bpmOrderId);
        String caseNumber = createCaseAndGetCaseNumber(request, httpHeaders).getCaseNumber();
        IopCaseRequest requestWithState = buildIopCaseRequestWithState(caseNumber, ServiceConfig.getCeaseLineAssignToUser(), "15");
        log.info("Created case in AMA for stuck cease order. Order ID: {}, Action ID: {}, Case Number: {}, trackingId{}",
            entity.getBillerOrderNo(), entity.getBillerOrderActionNo(), caseNumber, trackingId);
        // Clone headers and set tracking ID explicitly
        HttpHeaders asyncHeaders = new HttpHeaders();
        asyncHeaders.putAll(httpHeaders);
        asyncHeaders.set(CEASE_LINE_TRACKING_ID, trackingId);

        // Guard the async call
        if (fetchStateAsyncEnabled) {
            fetchCaseStateAsync(requestWithState, caseNumber, asyncHeaders);
        } else {
            log.info("Async case state fetching for in progress state is disabled for case: {}", caseNumber);
        }
        return caseNumber;
    }
\$\endgroup\$
0

1 Answer 1

7
\$\begingroup\$

durability

makes an entry to the DB

Good. I'm going to assume it is postgres, MariaDB, or some similarly scalable RDBMS. That is, we have ACID guarantees rather than a Valkey or NoSQL backend.

logging timestamps

String now = LocalDateTime.now().format(DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss.SSS"));

Extract that common need into a now() helper function, please.

Also, I'm surprised that slf4j isn't configured to stamp each log record.

sub-types

Map<Long, CompletableFuture<CeaseLineResponse>> pending = ...

I like it; looks good. It's very clear what's going on.

Consider defining class SeqNo to hold a sequence number long, in the interest of type safety.

unreachable

            log.warn("No future found for seqNo {}", seqNo);

Surely that's unreachable code, right? Seems like a more strident error than .warn() might be appropriate.

Or if racing workers routinely trigger that, maybe log nothing at all? Or design the raciness out of the system?

jitter

Rather than fixed two-second polling interval, consider adding up to a half-second of random delay. This prevents the common phenomenon of workers synchronizing their polling schedules due to differing "idle" vs "busy" response times from the common server they're querying.

curly braces

        if (ids.isEmpty()) return ids;
if (audit != null) log.info( ... );

At least it isn't on two lines like this:

if (audit != null)
    log.info( ... );

But still, put each if body within { } braces, please. Future maintainers will thank you.

Similar idioms appear a bunch of times in this codebase.

null checks

These are tediously long.

String status = orderStatus != null && orderStatus.getMessage() != null ? ...
String id     = orderStatus != null && orderStatus.getMessage() != null ? ...

Maybe rephrase with an if?

status = null;
id = null;
if (orderStatus != null && orderStatus.getMessage()) {
    status = orderStatus.getMessage().getStatus();
    id     = orderStatus.getMessage().getId();
}

Also, sequentially setting fields in some places, like the error handler, seems racy. Maybe protect these with a lock?

                orderRequest.setNextPollAt(null);
                orderRequest.setLockUntil(null);
                orderRequest.setLockedBy(null);

CSV helper

        return Arrays.stream(planStr.split(","))
            .map(String::trim)
            .map(Integer::parseInt)
            .collect(Collectors.toList());

There's a few places where you explode e.g. "6,7,8" into a list of integers. It's time to Extract Helper.

Similarly, this code wants an nPlaceHolders() helper:

    for (int i=0;i<ids.size();i++){ if(i>0) placeholders.append(","); placeholders.append("?"); }

global variable

A few places in PollingDao use UTC_TIMESTAMP(). This complicates writing automated unit and integration tests. Prefer to supply a clock object, which tests can control.

\$\endgroup\$
0

You must log in to answer this question.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.