Class Receiver<M>

All Implemented Interfaces:
HasSender, HasTransactionAttribute, IbisExceptionListener, IConfigurable, IConfigurationAware, IHasProcessState<M>, IManagable, IMessageHandler<M>, INamedObject, IProvidesMessageBrowsers<M>, IScopeProvider, IThreadCountControllable, EventThrowing, HasStatistics, org.springframework.beans.factory.Aware, org.springframework.context.ApplicationContextAware

Wrapper for a listener that specifies a channel for the incoming messages of a specific Adapter. By choosing a listener, the Frank developer determines how the messages are received. For example, an ApiListener receives RESTful HTTP requests and a JavaListener receives messages from direct Java calls.

Apart from wrapping the listener, a Receiver can be configured to store received messages and to keep track of the processed / failed status of these messages.

There are two kinds of listeners: synchronous listeners and asynchronous listeners. Synchronous listeners are expected to return a response. The system that triggers the receiver typically waits for a response before proceeding its operation. When a ApiListener receives a HTTP request, the listener is expected to return a HTTP response. Asynchronous listeners are not expected to return a response. The system that triggers the listener typically continues without waiting for the adapter to finish. When a receiver contains an asynchronous listener, it can have a sender that sends the transformed message to its destination. Receivers with an asynchronous listener can also have an error sender that is used by the receiver to send error messages. In other words: if the result state is SUCCESS then the message is sent by the ordinary sender, while the error sender is used if the result state is ERROR.

Transaction control

If transacted is set to true, messages will be received and processed under transaction control. This means that after a message has been read and processed and the transaction has ended, one of the following apply:
situationinput listenerPipelineinProcess storageerrorSendersummary of effect
successfulmessage read and committedmessage processedunchangedunchangedmessage processed
procesing failedmessage read and committedmessage processing failed and rolled backunchangedmessage sentmessage only transferred from listener to errroSender
listening failedunchanged: listening rolled backno processing performedunchangedunchangedno changes, input message remains on input available for listener
transfer to inprocess storage failedunchanged: listening rolled backno processing performedunchangedunchangedno changes, input message remains on input available for listener
transfer to errorSender failedmessage read and committedmessage processing failed and rolled backmessage presentunchangedmessage only transferred from listener to inProcess storage
If the application or the server crashes in the middle of one or more transactions, these transactions will be recovered and rolled back after the server/application is restarted. Then always exactly one of the following applies for any message touched at any time by Ibis by a transacted receiver:
  • It is processed correctly by the pipeline and removed from the input-queue, not present in inProcess storage and not send to the errorSender
  • It is not processed at all by the pipeline, or processing by the pipeline has been rolled back; the message is removed from the input queue and either (one of) still in inProcess storage or sent to the errorSender

commit or rollback
If transacted is set to true, messages will be either committed or rolled back. All message-processing transactions are committed, unless one or more of the following apply:

  • The PipeLine is transacted and the exitState of the pipeline is not equal to SUCCESS
  • a PipeRunException or another runtime-exception has been thrown by any Pipe or by the PipeLine
  • the setRollBackOnly() method has been called on the userTransaction (not accessible by Pipes)

Gerrit van Brakel
  • Field Details


      public static final org.springframework.transaction.TransactionDefinition TXREQUIRED

      public static final org.springframework.transaction.TransactionDefinition TXNEW_CTRL

      public org.springframework.transaction.TransactionDefinition TXNEW_PROC

      public static final String THREAD_CONTEXT_KEY_NAME
      See Also:

      public static final String THREAD_CONTEXT_KEY_TYPE
      See Also:

      public static final String RCV_CONFIGURED_MONITOR_EVENT
      See Also:

      See Also:

      public static final String RCV_STARTED_RUNNING_MONITOR_EVENT
      See Also:

      public static final String RCV_SHUTDOWN_MONITOR_EVENT
      See Also:

      public static final String RCV_SUSPENDED_MONITOR_EVENT
      See Also:

      public static final String RCV_RESUMED_MONITOR_EVENT
      See Also:

      public static final String RCV_THREAD_EXIT_MONITOR_EVENT
      See Also:

      public static final String RCV_MESSAGE_TO_ERRORSTORE_EVENT
      See Also:

      public static final String RCV_MESSAGE_LOG_COMMENTS
      See Also:

      public static final int RCV_SUSPENSION_MESSAGE_THRESHOLD
      See Also:

      public static final int MAX_RETRY_INTERVAL
      See Also:

      public static final String RETRY_FLAG_SESSION_KEY
      See Also:
    • runState

      protected final RunStateManager runState
  • Constructor Details

    • Receiver

      public Receiver()
  • Method Details

    • configurationSucceeded

      public boolean configurationSucceeded()
    • getLogPrefix

      protected String getLogPrefix()
    • info

      protected void info(String msg)
      sends an informational message to the log and to the messagekeeper of the adapter
    • warn

      protected void warn(String msg)
      sends a warning to the log and to the messagekeeper of the adapter
    • error

      protected void error(String msg, Throwable t)
      sends a error message to the log and to the messagekeeper of the adapter
    • openAllResources

      protected void openAllResources() throws ListenerException, TimeoutException
    • tellResourcesToStop

      protected void tellResourcesToStop()
      must lead to a 'closeAllResources()' and runstate must be 'STOPPING' if IPushingListener -> call closeAllResources() if IPullingListener -> PullingListenerContainer has to call closeAllResources();
    • closeAllResources

      protected void closeAllResources()
      Should only close resources when in state stopping (or error)! this should be the only trigger to change the state to stopped On exit resources must be 'closed' so the receiver RunState can be set to 'STOPPED'
    • propagateName

      protected void propagateName()
    • configure

      public void configure() throws ConfigurationException
      This method is called by the IAdapter to let the receiver do things to initialize itself before the startListening method is called.
      Specified by:
      configure in interface IConfigurable
      configure in class TransactionAttributes
      ConfigurationException - when initialization did not succeed.
      See Also:
    • startRunning

      public void startRunning()
      Description copied from interface: IManagable
      Instruct the object that implements IManagable to start working. The method does not wait for completion of the command; at return of this method, the object might be still in the STARTING-runstate
      Specified by:
      startRunning in interface IManagable
    • stopRunning

      public void stopRunning()
      Description copied from interface: IManagable
      Instruct the object that implements IManagable to stop working. The method does not wait for completion of the command; at return of this method, the object might be still in the STOPPING-runstate
      Specified by:
      stopRunning in interface IManagable
    • knownProcessStates

      public Set<ProcessState> knownProcessStates()
      Description copied from interface: IHasProcessState
      Provides the set of ProcessStates used by this listener.
      Specified by:
      knownProcessStates in interface IHasProcessState<M>
    • targetProcessStates

      public Map<ProcessState,Set<ProcessState>> targetProcessStates()
      Description copied from interface: IHasProcessState
      Provides the set of ProcessStates that a message in the specified state can be moved to, e.g. from a MessageBrowser for that state.
      Specified by:
      targetProcessStates in interface IHasProcessState<M>
    • changeProcessState

      public RawMessageWrapper<M> changeProcessState(RawMessageWrapper<M> message, ProcessState toState, String reason) throws ListenerException
      Description copied from interface: IHasProcessState
      Change the processState of the message to the specified state, if that state is supported. If it is not supported, nothing changes, and false is returned.
      Specified by:
      changeProcessState in interface IHasProcessState<M>
      the moved message, or null if no message was moved.
    • getMessageBrowser

      public IMessageBrowser<M> getMessageBrowser(ProcessState state)
      Description copied from interface: IProvidesMessageBrowsers
      returns a browser of messages that are in ProcessState 'state', and are stored in a storage managed by the listener itself (as opposed to a storage configured as a messageLog or errorStorage in the configuration).
      Specified by:
      getMessageBrowser in interface IProvidesMessageBrowsers<M>
    • startProcessingMessage

      protected void startProcessingMessage(long waitingDuration)
    • finishProcessingMessage

      protected void finishProcessingMessage(long processingDuration)
    • moveInProcessToError

      public void moveInProcessToError(RawMessageWrapper<M> rawMessageWrapper, Map<String,Object> context, Instant receivedDate, String comments, org.springframework.transaction.TransactionDefinition txDef)
      Move a message from the "in process" state or storage, to the error state or storage.
      rawMessageWrapper - Wrapper for the raw message, may be an instance of RawMessageWrapper or MessageWrapper. If an instance of RawMessageWrapper then the IListener will be used to extract the full Message object to be sent to the error storage.
      context - Context of the process. Can be either the thread context of a IPullingListener, or the current PipeLineSession.
      receivedDate - Timestamp of when the message was received.
      comments - Processing comments and error message regarding the reason the message was rejected.
      txDef - TransactionDefinition for the transaction to be used for moving the message to error state / storage.
    • processRequest

      public Message processRequest(IListener<M> origin, @Nonnull RawMessageWrapper<M> rawMessage, @Nonnull Message message, @Nonnull PipeLineSession session) throws ListenerException
      Process the received message with processRequest(IListener, RawMessageWrapper, Message, PipeLineSession). A messageId is generated that is unique and consists of the name of this listener and a GUID N.B. callers of this method should clear the remaining ThreadContext if it's not to be returned to their callers.
      Specified by:
      processRequest in interface IMessageHandler<M>
    • processRawMessage

      public void processRawMessage(IListener<M> origin, RawMessageWrapper<M> rawMessage, @Nonnull PipeLineSession session, boolean duplicatesAlreadyChecked) throws ListenerException
      Description copied from interface: IMessageHandler
      Specified by:
      processRawMessage in interface IMessageHandler<M>
    • processRawMessage

      public void processRawMessage(IListener<M> origin, RawMessageWrapper<M> rawMessage, @Nonnull PipeLineSession session, long waitingDuration, boolean duplicatesAlreadyChecked) throws ListenerException
      Description copied from interface: IMessageHandler
      Specified by:
      processRawMessage in interface IMessageHandler<M>
    • retryMessage

      public void retryMessage(String storageKey) throws ListenerException
    • cacheProcessResult

      public org.frankframework.receivers.Receiver.ProcessResultCacheItem cacheProcessResult(String messageId, String errorMessage, Instant receivedDate)
    • getCachedErrorMessage

      public String getCachedErrorMessage(String messageId)
    • getDeliveryCount

      public int getDeliveryCount(RawMessageWrapper<M> rawMessage)
    • exceptionThrown

      public void exceptionThrown(INamedObject object, Throwable t)
      Description copied from interface: IbisExceptionListener
      Inform the implementing class that the exception t occurred in object.
      Specified by:
      exceptionThrown in interface IbisExceptionListener
    • exceptionThrown

      public void exceptionThrown(String errorMessage, Throwable t)
    • getEventSourceName

      public String getEventSourceName()
      Specified by:
      getEventSourceName in interface EventThrowing
    • throwEvent

      protected void throwEvent(String eventCode)
    • resetRetryInterval

      public void resetRetryInterval()
    • increaseRetryIntervalAndWait

      public void increaseRetryIntervalAndWait(Throwable t, String description)
    • isThreadCountReadable

      public boolean isThreadCountReadable()
      Specified by:
      isThreadCountReadable in interface IThreadCountControllable
    • isThreadCountControllable

      public boolean isThreadCountControllable()
      Specified by:
      isThreadCountControllable in interface IThreadCountControllable
    • getCurrentThreadCount

      public int getCurrentThreadCount()
      Specified by:
      getCurrentThreadCount in interface IThreadCountControllable
    • getMaxThreadCount

      public int getMaxThreadCount()
      Specified by:
      getMaxThreadCount in interface IThreadCountControllable
    • increaseThreadCount

      public void increaseThreadCount()
      Specified by:
      increaseThreadCount in interface IThreadCountControllable
    • decreaseThreadCount

      public void decreaseThreadCount()
      Specified by:
      decreaseThreadCount in interface IThreadCountControllable
    • setRunState

      @Protected public void setRunState(RunState state)
      Changes runstate. Always stops the receiver when state is `**ERROR**`
    • getRunState

      public RunState getRunState()
      Get the runstate of this receiver.
      Specified by:
      getRunState in interface IManagable
    • isInRunState

      public boolean isInRunState(RunState someRunState)
    • formatException

      public Message formatException(String extraInfo, String messageId, Message message, Throwable t)
      Description copied from interface: IMessageHandler
      Formats any exception thrown by any of the above methods to a message that can be returned. Can be used if the calling system has no other way of returning the exception to the caller.
      Specified by:
      formatException in interface IMessageHandler<M>
    • getListenerContainer

      public PullingListenerContainer<M> getListenerContainer()
    • setListenerContainer

      public void setListenerContainer(PullingListenerContainer<M> listenerContainer)
    • createListenerContainer

      public PullingListenerContainer<M> createListenerContainer()
    • getProcessStatistics

      protected io.micrometer.core.instrument.DistributionSummary getProcessStatistics(int threadsProcessing)
    • getIdleStatistics

      protected io.micrometer.core.instrument.DistributionSummary getIdleStatistics(int threadsProcessing)
    • getProcessStatistics

      public Iterable<io.micrometer.core.instrument.DistributionSummary> getProcessStatistics()
      Returns an iterator over the process-statistics
    • getIdleStatistics

      public Iterable<io.micrometer.core.instrument.DistributionSummary> getIdleStatistics()
      Returns an iterator over the idle-statistics
    • isOnErrorContinue

      public boolean isOnErrorContinue()
    • getMessagesReceived

      public double getMessagesReceived()
      get the number of messages received by this receiver.
    • getMessagesRetried

      public double getMessagesRetried()
      get the number of duplicate messages received this receiver.
    • getMessagesRejected

      public double getMessagesRejected()
      Get the number of messages rejected (discarded or put in errorStorage).
    • getLastMessageDate

      public long getLastMessageDate()
    • resetNumberOfExceptionsCaughtWithoutMessageBeingReceived

      public void resetNumberOfExceptionsCaughtWithoutMessageBeingReceived()
    • toString

      public String toString()
      Returns a toString of this class by introspection and the toString() value of its listener.
      toString in class Object
      Description of the Return Value
    • setListener

      public void setListener(IListener<M> newListener)
      Sets the listener used to receive messages from.
    • setSender

      public void setSender(ISender sender)
      Sender to which the response (output of PipeLine) should be sent. Applies if the receiver has an asynchronous listener. N.B. Sending correlated responses via this sender is not supported.
    • setErrorSender

      public void setErrorSender(ISender errorSender)
      Sender that will send the result in case the PipeLineExit state was not SUCCESS. Applies if the receiver has an asynchronous listener.
    • setErrorStorage

      public void setErrorStorage(ITransactionalStorage<Serializable> errorStorage)
      Storage to keep track of messages that failed processing
    • setMessageLog

      public void setMessageLog(ITransactionalStorage<Serializable> messageLog)
      Storage to keep track of all messages processed correctly
    • setName

      public void setName(String newName)
      Sets the name of the Receiver, as known to the Adapter. If the listener implements the name interface and getName() of the listener is empty, the name of this object is given to the listener.
      Specified by:
      setName in interface INamedObject
    • setOnError

      public void setOnError(Receiver.OnError value)
      One of 'continue', 'recover' or 'close'. Controls the behaviour of the Receiver, when it encounters an error during processing of a message.
      Default value
    • setNumThreads

      public void setNumThreads(int newNumThreads)
      The number of threads that may execute a Pipeline concurrently (only for pulling listeners)
      Default value
    • setNumThreadsPolling

      public void setNumThreadsPolling(int i)
      The number of threads that are actively polling for messages concurrently. '0' means 'limited only by numthreads' (only for pulling listeners)
      Default value
    • setPollInterval

      public void setPollInterval(int i)
      The number of seconds waited after an unsuccessful poll attempt, before another poll attempt is made. Only for polling listeners, not for e.g. jms, webservice or javaListeners
      Default value
    • setStartTimeout

      public void setStartTimeout(int i)
      timeout (in seconds) to start receiver. If this timeout is exceeded, the Receiver startup is aborted and all resources closed and the receiver will be in state EXCEPTION_STARTING and a new start command may be issued again.
    • setStopTimeout

      public void setStopTimeout(int i)
      timeout (in seconds) to stop receiver. If this timeout is exceeded, stopping will be aborted and the receiver will be in state EXCEPTION_STOPPING. The receiver will no longer be running but some resources might not have been cleaned up properly.
    • setCheckForDuplicates

      public void setCheckForDuplicates(boolean b)
      If set to true, each message is checked for presence in the messageLog. If already present, it is not processed again. Only required for non XA compatible messaging. Requires messageLog!
      Default value
    • setCheckForDuplicatesMethod

      public void setCheckForDuplicatesMethod(Receiver.CheckForDuplicatesMethod method)
      (Only used when checkForDuplicates=true) Indicates whether the messageid or the correlationid is used for checking presence in the message log
      Default value
    • setMaxDeliveries

      public void setMaxDeliveries(int i)
      The maximum delivery count after which to stop processing the message (only for listeners that know the delivery count of received messages). If -1 the delivery count is ignored
      Default value
    • setMaxRetries

      public void setMaxRetries(int i)
      The number of times a processing attempt is automatically retried after an exception is caught or rollback is experienced. If maxRetries < 0 the number of attempts is infinite
      Default value
    • setProcessResultCacheSize

      public void setProcessResultCacheSize(int processResultCacheSize)
      Size of the cache to keep process results, used by maxRetries
      Default value
    • setReturnedSessionKeys

      @Deprecated @ConfigurationWarning("attribute is no longer used. Please use attribute returnedSessionKeys of the JavaListener if the set of sessionsKeys that can be returned to callers session must be limited.") public void setReturnedSessionKeys(String string)
    • setCorrelationIDXPath

      public void setCorrelationIDXPath(String string)
      XPath expression to extract correlationid from message
    • setCorrelationIDNamespaceDefs

      public void setCorrelationIDNamespaceDefs(String correlationIDNamespaceDefs)
      Namespace defintions for correlationIDXPath. Must be in the form of a comma or space separated list of prefix=namespaceuri-definitions
    • setCorrelationIDStyleSheet

      public void setCorrelationIDStyleSheet(String string)
      Stylesheet to extract correlationID from message
    • setLabelXPath

      public void setLabelXPath(String string)
      XPath expression to extract label from message
    • setLabelNamespaceDefs

      public void setLabelNamespaceDefs(String labelNamespaceDefs)
      Namespace defintions for labelXPath. Must be in the form of a comma or space separated list of prefix=namespaceuri-definitions
    • setLabelStyleSheet

      public void setLabelStyleSheet(String string)
      Stylesheet to extract label from message
    • setChompCharSize

      public void setChompCharSize(String string)
      If set (>=0) and the character data length inside a xml element exceeds this size, the character data is chomped (with a clear comment)
    • setElementToMove

      public void setElementToMove(String string)
      If set, the character data in this XML element is stored inside a session key and in the message it is replaced by a reference to this session key: {sessionKey: + elementToMoveSessionKey + }
    • setElementToMoveSessionKey

      public void setElementToMoveSessionKey(String string)
      (Only used when elementToMove or elementToMoveChain is set) Name of the session key wherein the character data is stored
      Default value
      ref_ + the name of the element
    • setElementToMoveChain

      public void setElementToMoveChain(String string)
      Like elementToMove but element is preceded with all ancestor elements and separated by semicolons (e.g. adapter;pipeline;pipe)
    • setRemoveCompactMsgNamespaces

      public void setRemoveCompactMsgNamespaces(boolean b)
    • setHideRegex

      public void setHideRegex(String hideRegex)
      Regular expression to mask strings in the errorStore/logStore. Every character between to the strings in this expression will be replaced by a '*'. For example, the regular expression (?<=<party>).*?(?=</party>) will replace every character between keys <party> and </party>
    • setHideMethod

      public void setHideMethod(IMessageBrowser.HideMethod hideMethod)
      Only used when hideRegex is not empty
      Default value
    • setHiddenInputSessionKeys

      public void setHiddenInputSessionKeys(String string)
      Comma separated list of keys of session variables which are available when the PipelineSession is created and of which the value will not be shown in the log (replaced by asterisks)
    • setForceRetryFlag

      public void setForceRetryFlag(boolean b)
      If set to true, every message read will be processed as if it is being retried, by setting a session variable to "retry".
      Default value
    • setNumberOfExceptionsCaughtWithoutMessageBeingReceivedThreshold

      public void setNumberOfExceptionsCaughtWithoutMessageBeingReceivedThreshold(int number)
      Number of connection attempts to put the adapter in warning status
      Default value
    • getConfigurationClassLoader

      public ClassLoader getConfigurationClassLoader()
      Description copied from interface: IScopeProvider
      This ClassLoader is set upon creation of the object, used to retrieve resources configured by the Ibis application.
      Specified by:
      getConfigurationClassLoader in interface IScopeProvider
      returns the ClassLoader created by the ClassLoaderManager.
    • getApplicationContext

      public org.springframework.context.ApplicationContext getApplicationContext()
      Specified by:
      getApplicationContext in interface IConfigurationAware
    • setApplicationContext

      public void setApplicationContext(org.springframework.context.ApplicationContext applicationContext)
      Specified by:
      setApplicationContext in interface org.springframework.context.ApplicationContextAware
    • getOnError

      public Receiver.OnError getOnError()
    • getName

      public String getName()
      Specified by:
      getName in interface IConfigurationAware
      Specified by:
      getName in interface INamedObject
    • getNumThreads

      public int getNumThreads()
    • getNumThreadsPolling

      public int getNumThreadsPolling()
    • getPollInterval

      public int getPollInterval()
    • getStartTimeout

      public int getStartTimeout()
    • getStopTimeout

      public int getStopTimeout()
    • isForceRetryFlag

      public boolean isForceRetryFlag()
    • isCheckForDuplicates

      public boolean isCheckForDuplicates()
    • getCheckForDuplicatesMethod

      public Receiver.CheckForDuplicatesMethod getCheckForDuplicatesMethod()
    • getMaxDeliveries

      public int getMaxDeliveries()
    • getMaxRetries

      public int getMaxRetries()
    • getProcessResultCacheSize

      public int getProcessResultCacheSize()
    • isSupportProgrammaticRetry

      public boolean isSupportProgrammaticRetry()
    • getCorrelationIDXPath

      public String getCorrelationIDXPath()
    • getCorrelationIDNamespaceDefs

      public String getCorrelationIDNamespaceDefs()
    • getCorrelationIDStyleSheet

      public String getCorrelationIDStyleSheet()
    • getLabelXPath

      public String getLabelXPath()
    • getLabelNamespaceDefs

      public String getLabelNamespaceDefs()
    • getLabelStyleSheet

      public String getLabelStyleSheet()
    • getChompCharSize

      public String getChompCharSize()
    • getElementToMove

      public String getElementToMove()
    • getElementToMoveSessionKey

      public String getElementToMoveSessionKey()
    • getElementToMoveChain

      public String getElementToMoveChain()
    • isRemoveCompactMsgNamespaces

      public boolean isRemoveCompactMsgNamespaces()
    • getHideRegex

      public String getHideRegex()
    • getHideMethod

      public IMessageBrowser.HideMethod getHideMethod()
    • getHiddenInputSessionKeys

      public String getHiddenInputSessionKeys()
    • isNumberOfExceptionsCaughtWithoutMessageBeingReceivedThresholdReached

      public boolean isNumberOfExceptionsCaughtWithoutMessageBeingReceivedThresholdReached()
    • getAdapter

      public Adapter getAdapter()
      Specified by:
      getAdapter in interface EventThrowing
      Specified by:
      getAdapter in interface HasStatistics
    • setAdapter

      public void setAdapter(Adapter adapter)
    • getListener

      public IListener<M> getListener()
    • getErrorSender

      public ISender getErrorSender()
    • getMessageLog

      public ITransactionalStorage<Serializable> getMessageLog()
    • getErrorStorage

      public ITransactionalStorage<Serializable> getErrorStorage()
    • getSender

      public ISender getSender()
      Specified by:
      getSender in interface HasSender
    • setConfigurationMetrics

      public void setConfigurationMetrics(MetricsInitializer configurationMetrics)
    • getTxManager

      public org.springframework.transaction.PlatformTransactionManager getTxManager()
    • setTxManager

      public void setTxManager(org.springframework.transaction.PlatformTransactionManager txManager)
    • setEventPublisher

      public void setEventPublisher(EventPublisher eventPublisher)