代码之家  ›  专栏  ›  技术社区  ›  user3007501

使用cloudhopper发送异步PDU响应

  •  2
  • user3007501  · 技术社区  · 10 年前

    我的项目使用Cloudhopper5.0.6库来保持SMPP连接(3.4版本)并发送或接收PDU。 我需要修改默认的PDUResopnse,因此,自定义的PDU处理是通过以下方式扩展DefaultSmppSessionHandler来组织的:

     public class SmppSessionHandlerController extends DefaultSmppSessionHandler {
    
        @Override
        public PduResponse firePduRequestReceived(PduRequest pduRequest) {
            PduRequestHandler pduReqHandler = pduRequestHandler;
            PduResponse resultPduResponse = pduRequest.createResponse();
            return processDefaultPduResponse(resultPduResponse);
        }
    
        private PduResponse processDefaultPduResponse(PduResponse pduResponse) {
            //do some transformations here on pduResponse...
            return pduResponse;
        }
     }
    

    它仅适用于以下目的:

    1. 更改结果命令状态或某些pdu字段/tlv参数
    2. 不发送当前PDU请求的任何响应。要做到这一点 已收到firePduRequest 方法必须返回 无效的 .

    现在我需要添加延迟的PDU响应发送,问题开始了。 我的第一次尝试是这样的:

        @Override
        public PduResponse firePduRequestReceived(PduRequest pduRequest) {
            PduRequestHandler pduReqHandler = pduRequestHandler;
            PduResponse resultPduResponse = pduRequest.createResponse();
            return processDefaultPduResponse(resultPduResponse);
        }
    
        private PduResponse processDefaultPduResponse(PduResponse pduResponse) {
            try {
                    Thread.sleep(responseDelay);
                } catch (InterruptedException e) {
                    throw new RuntimeException("Response delay interrupted", e);
                }
            return pduResponse;
        }
    

    当前线程的休眠被添加到延迟发送响应,因此调用线程被保留了 响应延迟 毫秒。如果此会话的同时没有更多请求,则此操作正常。在同一会话中添加一些submit_sm负载导致错误:

    com.cloudhopper.smpp.type.SmppTimeoutException: Unable to get response within [10000 ms]
        at com.cloudhopper.smpp.impl.DefaultSmppSession.sendRequestAndGetResponse(DefaultSmppSession.java:471) ~[ch-smpp-5.0.6.jar:5.0.6]
        at com.cloudhopper.smpp.impl.DefaultSmppSession.enquireLink(DefaultSmppSession.java:439) ~[ch-smpp-5.0.6.jar:5.0.6] 
    

    在coudhopper源代码中搜索后,我发现了问题,这是DefaultSmppSession类中任何操作的排除窗口锁定:

     future = sendWindow.offer(pdu.getSequenceNumber(), pdu, timeoutMillis, configuration.getRequestExpiryTimeout(), synchronous);
    

    问题出在 com.cloudhopper.commons.util.windowing.窗口 类,该类使用独占锁来执行任何操作,因此在一个线程中返回PRUResponse并从另一个线程发出请求之前不可能等待。

    下一次尝试是返回 无效的 作为请求处理(丢弃请求而不发送任何响应),并使用 com.cloudhopper.smpp.SmppSession.sendResponsePdu(pduResponse) 方法这种方法有效一段时间,但最终总会出现以下异常:

    com.cloudhopper.smpp.type.SmppChannelException: null
        at com.cloudhopper.smpp.impl.DefaultSmppSession.sendResponsePdu(DefaultSmppSession.java:581) ~[ch-smpp-5.0.6.jar:5.0.6]
        at com.svzn.autotest.smppclient.impl.cloudhopper.SmppSendingManager.sendPduResponse(SmppSendingManager.java:84) ~[smpp-client-1.0.1.jar:na]
        at com.svzn.autotest.smppclient.impl.cloudhopper.util.SendPduCommand.sendPduResponse(SendPduCommand.java:80) [smpp-client-1.0.1.jar:na]
        at com.svzn.autotest.smppclient.impl.cloudhopper.SmppClientImpl.sendPduResponse(SmppClientImpl.java:91) [smpp-client-1.0.1.jar:na]
        at com.svzn.autotest.example.testng_aggr.lib.smpp.event.BaseEventProcessor$1.run(BaseEventProcessor.java:62) [test-classes/:na]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439) [na:1.6.0_37]
        at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:303) [na:1.6.0_37]
        at java.util.concurrent.FutureTask.run(FutureTask.java:138) [na:1.6.0_37]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:98) [na:1.6.0_37]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:206) [na:1.6.0_37]
        at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) [na:1.6.0_37]
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) [na:1.6.0_37]
        at java.lang.Thread.run(Thread.java:662) [na:1.6.0_37]
    Caused by: org.jboss.netty.handler.timeout.WriteTimeoutException: null
        at org.jboss.netty.handler.timeout.WriteTimeoutHandler.<clinit>(WriteTimeoutHandler.java:79) ~[netty-3.9.0.Final.jar:na]
        at com.cloudhopper.smpp.impl.DefaultSmppClient.createSession(DefaultSmppClient.java:259) ~[ch-smpp-5.0.6.jar:5.0.6]
        at com.cloudhopper.smpp.impl.DefaultSmppClient.doOpen(DefaultSmppClient.java:226) ~[ch-smpp-5.0.6.jar:5.0.6]
        at com.cloudhopper.smpp.impl.DefaultSmppClient.bind(DefaultSmppClient.java:193) ~[ch-smpp-5.0.6.jar:5.0.6]
        at com.svzn.autotest.smppclient.impl.cloudhopper.tasks.RebindTask.run(RebindTask.java:37) ~[smpp-client-1.0.1.jar:na]
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:439) [na:1.6.0_37]
        at java.util.concurrent.FutureTask$Sync.innerRunAndReset(FutureTask.java:317) [na:1.6.0_37]
        at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:150) [na:1.6.0_37]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$101(ScheduledThreadPoolExecutor.java:98) [na:1.6.0_37]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.runPeriodic(ScheduledThreadPoolExecutor.java:180) [na:1.6.0_37]
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoo
    
    lExecutor.java:204) [na:1.6.0_37
    
    ]
        ... 3 common frames omitted
    

    不知道如何修复此错误或以另一种方式在同一会话中发送异步PDURresponse。 你对此有什么想法吗?

    1 回复  |  直到 10 年前
        1
  •  2
  •   user3007501    10 年前

    最后,我发现了问题。问题出现在不正确的同步块中,它阻止了并行异步事件处理(发送pdu响应),并以通常的方式处理请求和响应而不进行处理。

    调用完全可以 com.cloudhopper.smpp.SmppSession.sendResponsePdu(pduResponse) 一个线程中的方法,通过扩展处理请求和响应 默认SmppSessionHandler 来自另一个。一切都将在同一会话中处理。

    更新: 下面是我用来处理pdu请求的实现:

    public class SmppSessionHandlerController extends DefaultSmppSessionHandler {
        private static final Logger log = LoggerFactory.getLogger(SmppSessionHandlerController.class);
    
        private volatile PduHandler pduHandler;
    private PduResponseHandler pduResponseHandler;
    private PduRequestHandler pduRequestHandler;
    
    public SmppSessionHandlerController() {
        super(log);
    }
    
    public PduHandler getPduHandler() {
        return pduHandler;
    }
    
    public void setPduHandler(PduHandler pduHandler) {
        this.pduHandler = pduHandler;
    }
    
    public PduResponseHandler getPduResponseHandler() {
        return pduResponseHandler;
    }
    
    public void setPduResponseHandler(PduResponseHandler pduResponseHandler) {
        this.pduResponseHandler = pduResponseHandler;
    }
    
    public PduRequestHandler getPduRequestHandler() {
        return pduRequestHandler;
    }
    
    public void setPduRequestHandler(PduRequestHandler pduRequestHandler) {
        this.pduRequestHandler = pduRequestHandler;
    }
    
    @Override
    public void fireExpectedPduResponseReceived(PduAsyncResponse pduAsyncResponse) {
        log.trace("Handling response PDU: {}", pduAsyncResponse);
        pduAsyncResponse.getResponse().setReferenceObject(pduAsyncResponse.getRequest().getReferenceObject());
        processPduResponse(pduAsyncResponse.getResponse());
    }
    
    
    @Override
    public void fireUnexpectedPduResponseReceived(PduResponse pduResponse) {
        log.warn("Handling unexpected response PDU: {}", pduResponse);
        processPduResponse(pduResponse);
    }
    
    @Override
    public boolean firePduReceived(Pdu pdu) {
        PduHandler currPduHandler = pduHandler;
        if (currPduHandler != null) {
            SmppPdu smppPdu = PduToApiConverter.convertToApiObject(pdu);
            currPduHandler.handlePduReceived(smppPdu);
        }
        // default handling is to accept pdu for processing up chain
        return true;
    }
    
    public void firePduRequestExpired(PduRequest pduRequest) {
        super.firePduRequestExpired(pduRequest);
    }
    
    private void processPduResponse(PduResponse pduResponse) {
        HandlersContextHelper referenceObj = (HandlersContextHelper) pduResponse.getReferenceObject();
        if (referenceObj != null) {
            referenceObj.getSequenceIdHolder().addReceivedSequenceId(pduResponse.getSequenceNumber());
        }
        PduResponseHandler pduRespHandler = pduResponseHandler;
        if (pduRespHandler != null) {
            SmppPduResponse smppPduResponse = PduToApiConverter.convertToApiResponse(pduResponse);
            if (smppPduResponse != null) {
                pduRespHandler.handlePduResponse(smppPduResponse);
            }
        }
        if (referenceObj != null) {
            referenceObj.getSequenceIdHolder().checkSentAndReceivedClosed();
        }
    }
    
    @Override
    public PduResponse firePduRequestReceived(PduRequest pduRequest) {
        PduRequestHandler pduReqHandler = pduRequestHandler;
        PduResponse resultPduResponse = pduRequest.createResponse();
        if (pduReqHandler == null) {
            return resultPduResponse;
        }
        PduResponse defaultPduResponse = pduRequest.createResponse();
        SmppPduRequest smppPduRequest = PduToApiConverter.convertToApiRequest(pduRequest);
        SmppPduResponse defaultSmppPduResponse = PduToApiConverter.convertToApiResponse(defaultPduResponse);
        if (smppPduRequest == null || defaultSmppPduResponse == null) {
            return resultPduResponse;
        }
        SmppPduResponse resultSmppPduResponse = pduReqHandler.handlePduRequest(smppPduRequest, defaultSmppPduResponse);
        if (resultSmppPduResponse == null) {
            return null;
        }
        PduResponse convertedPduResponse = ApiToPduConverter.convertToPduResponse(resultSmppPduResponse);
        if (convertedPduResponse == null) {
            return resultPduResponse;
        }
        if (!resultPduResponse.getClass().isAssignableFrom(convertedPduResponse.getClass())) {
            return resultPduResponse;
        }
        return convertedPduResponse;
    }
    }
    

    Wich被添加到clowdhoppersmpp客户端,如下所示

    SmppSession session = smppClient.bind(SmppSessionConfiguration_instance, SmppSessionHandlerController_instance );
    

    我为处理smpp事件的特殊情况的PduHandlerPduRequestHandler和PduResponseHandler定义了自定义接口,您可以看到SmppSessionHandlerController只是将调用委托给其中一个。

    使用方法

    public PduResponse firePduRequestReceived(PduRequest pduRequest) 
    

    在SmppSessionHandler中定义您可以在同步模式下发送所需的任何响应。如果要在异步模式下执行此操作,请返回null pduResponse并使用SmppSession.sendResponsePdu(Pdu),而不是当前线程或任何其他线程。

    推荐文章