@Category(value="Basic") public class Receiver<M> extends TransactionAttributes implements IManagable, IReceiverStatistics, IMessageHandler<M>, IProvidesMessageBrowsers<Object>, EventThrowing, IbisExceptionListener, HasSender, HasStatistics, IThreadCountControllable, org.springframework.beans.factory.BeanFactoryAware
Adapter
.
By choosing a listener, the Frank developer determines how the messages are received.
For example, an ApiListener
receives RESTful HTTP requests and a
JavaListener
receives messages from direct Java calls.
Receiver
can be configured
to store received messages and to keep track of the processed / failed
status of these messages.
ApiListener
receives a HTTP request, the listener is expected to return a
HTTP response. Asynchronous listeners are not expected to return a response. The system that
triggers the listener typically continues without waiting for the adapter to finish. When a
receiver contains an asynchronous listener, it can have a sender that sends the transformed
message to its destination. Receivers with an asynchronous listener can also have an error sender that is used
by the receiver to send error messages. In other words: if the result state is SUCCESS then the
message is sent by the ordinary sender, while the error sender is used if the result state
is ERROR.
transacted
is set to true
, messages will be received and processed under transaction control.
This means that after a message has been read and processed and the transaction has ended, one of the following apply:
situation | input listener | Pipeline | inProcess storage | errorSender | summary of effect |
---|---|---|---|---|---|
successful | message read and committed | message processed | unchanged | unchanged | message processed |
procesing failed | message read and committed | message processing failed and rolled back | unchanged | message sent | message only transferred from listener to errroSender |
listening failed | unchanged: listening rolled back | no processing performed | unchanged | unchanged | no changes, input message remains on input available for listener |
transfer to inprocess storage failed | unchanged: listening rolled back | no processing performed | unchanged | unchanged | no changes, input message remains on input available for listener |
transfer to errorSender failed | message read and committed | message processing failed and rolled back | message present | unchanged | message only transferred from listener to inProcess storage |
commit or rollback
If transacted
is set to true
, messages will be either committed or rolled back.
All message-processing transactions are committed, unless one or more of the following apply:
Modifier and Type | Class and Description |
---|---|
static class |
Receiver.CheckForDuplicatesMethod |
static class |
Receiver.OnError |
HasStatistics.Action
Modifier and Type | Field and Description |
---|---|
static int |
MAX_RETRY_INTERVAL |
static String |
RCV_CONFIGURATIONEXCEPTION_MONITOR_EVENT |
static String |
RCV_CONFIGURED_MONITOR_EVENT |
static String |
RCV_MESSAGE_LOG_COMMENTS |
static String |
RCV_MESSAGE_TO_ERRORSTORE_EVENT |
static String |
RCV_RESUMED_MONITOR_EVENT |
static String |
RCV_SHUTDOWN_MONITOR_EVENT |
static String |
RCV_STARTED_RUNNING_MONITOR_EVENT |
static String |
RCV_SUSPENDED_MONITOR_EVENT |
static int |
RCV_SUSPENSION_MESSAGE_THRESHOLD |
static String |
RCV_THREAD_EXIT_MONITOR_EVENT |
static String |
RETRY_FLAG_SESSION_KEY |
protected RunStateManager |
runState |
static String |
THREAD_CONTEXT_KEY_NAME |
static String |
THREAD_CONTEXT_KEY_TYPE |
static org.springframework.transaction.TransactionDefinition |
TXNEW_CTRL |
org.springframework.transaction.TransactionDefinition |
TXNEW_PROC |
static org.springframework.transaction.TransactionDefinition |
TXREQUIRED |
log
Constructor and Description |
---|
Receiver() |
Modifier and Type | Method and Description |
---|---|
void |
cacheProcessResult(String messageId,
String errorMessage,
Date receivedDate) |
M |
changeProcessState(Object message,
ProcessState toState,
String reason)
Change the processState of the message to the specified state, if that state
is supported.
|
protected void |
closeAllResources()
Should only close resources when in state stopping (or error)! this should be the only trigger to change the state to stopped
On exit resources must be 'closed' so the receiver RunState can be set to 'STOPPED'
|
boolean |
configurationSucceeded() |
void |
configure()
This method is called by the
IAdapter to let the
receiver do things to initialize itself before the startListening
method is called. |
PullingListenerContainer<M> |
createListenerContainer() |
void |
decreaseThreadCount() |
protected void |
error(String msg,
Throwable t)
sends a error message to the log and to the messagekeeper of the adapter
|
void |
exceptionThrown(INamedObject object,
Throwable t)
Inform the implementing class that the exception
t occurred in object . |
void |
exceptionThrown(String errorMessage,
Throwable t) |
protected void |
finishProcessingMessage(long processingDuration) |
Message |
formatException(String extrainfo,
String messageId,
Message message,
Throwable t)
Formats any exception thrown by any of the above methods to a message that can be returned.
|
org.springframework.beans.factory.BeanFactory |
getBeanFactory() |
String |
getCachedErrorMessage(String messageId) |
int |
getCurrentThreadCount() |
int |
getDeliveryCount(String messageId,
M rawMessage) |
String |
getEventSourceName() |
Iterable<StatisticsKeeper> |
getIdleStatistics()
Returns an iterator over the idle-statistics
|
protected StatisticsKeeper |
getIdleStatistics(int threadsProcessing) |
long |
getLastMessageDate() |
PullingListenerContainer<M> |
getListenerContainer() |
protected String |
getLogPrefix() |
int |
getMaxThreadCount() |
IMessageBrowser<Object> |
getMessageBrowser(ProcessState state)
returns a
browser of messages that are in ProcessState 'state', and are stored in a
storage managed by the listener itself (as opposed to a storage configured as a messageLog or errorStorage in the configuration). |
long |
getMessagesReceived()
get the number of messages received by this receiver.
|
long |
getMessagesRejected()
Get the number of messages rejected (discarded or put in errorStorage).
|
long |
getMessagesRetried()
get the number of duplicate messages received this receiver.
|
Iterable<StatisticsKeeper> |
getProcessStatistics()
Returns an iterator over the process-statistics
|
protected StatisticsKeeper |
getProcessStatistics(int threadsProcessing) |
Iterable<StatisticsKeeper> |
getQueueingStatistics() |
RunState |
getRunState()
Get the
runstate of this receiver. |
boolean |
hasProblematicHistory(String messageId,
boolean manualRetry,
Object rawMessageOrWrapper,
ThrowingSupplier<Message,ListenerException> messageSupplier,
Map<String,Object> threadContext,
String correlationId) |
void |
increaseRetryIntervalAndWait(Throwable t,
String description) |
void |
increaseThreadCount() |
protected void |
info(String msg)
sends an informational message to the log and to the messagekeeper of the adapter
|
boolean |
isInRunState(RunState someRunState) |
boolean |
isOnErrorContinue() |
boolean |
isThreadCountControllable() |
boolean |
isThreadCountReadable() |
void |
iterateOverStatistics(StatisticsKeeperIterationHandler hski,
Object data,
HasStatistics.Action action) |
Set<ProcessState> |
knownProcessStates()
Provides the set of ProcessStates used by this listener.
|
void |
moveInProcessToError(String originalMessageId,
String correlationId,
ThrowingSupplier<Message,ListenerException> messageSupplier,
Date receivedDate,
String comments,
Object rawMessage,
org.springframework.transaction.TransactionDefinition txDef) |
protected void |
openAllResources() |
void |
processRawMessage(IListener<M> origin,
M rawMessage)
Same as
IMessageHandler.processRawMessage(IListener,Object,PipeLineSession, boolean) , but now without context, for convenience |
void |
processRawMessage(IListener<M> origin,
M rawMessage,
PipeLineSession session,
boolean duplicatesAlreadyChecked)
Will use listener to perform getIdFromRawMessage(), getStringFromRawMessage and afterMessageProcessed
|
void |
processRawMessage(IListener<M> origin,
M rawMessage,
PipeLineSession session,
long waitingDuration,
boolean duplicatesAlreadyChecked)
Same as
IMessageHandler.processRawMessage(IListener,Object,PipeLineSession, boolean) , but now updates IdleStatistics too |
Message |
processRequest(IListener<M> origin,
M rawMessage,
Message message,
PipeLineSession session)
Process the received message with
processRequest(IListener, Object, Message, PipeLineSession) . |
protected void |
propagateName() |
protected void |
registerEvent(String eventCode) |
void |
resetNumberOfExceptionsCaughtWithoutMessageBeingReceived() |
void |
resetRetryInterval() |
void |
retryMessage(String storageKey) |
void |
setBeanFactory(org.springframework.beans.factory.BeanFactory beanFactory) |
void |
setCheckForDuplicates(boolean b)
If set to
true , each message is checked for presence in the messageLog. |
void |
setCheckForDuplicatesMethod(Receiver.CheckForDuplicatesMethod method)
(Only used when
checkForDuplicates=true ) Indicates whether the messageid or the correlationid is used for checking presence in the message log |
void |
setChompCharSize(String string)
If set (>=0) and the character data length inside a xml element exceeds this size, the character data is chomped (with a clear comment)
|
void |
setCorrelationIDNamespaceDefs(String correlationIDNamespaceDefs)
Namespace defintions for correlationIDXPath.
|
void |
setCorrelationIDStyleSheet(String string)
Stylesheet to extract correlationID from message
|
void |
setCorrelationIDXPath(String string)
XPath expression to extract correlationid from message
|
void |
setElementToMove(String string)
If set, the character data in this element is stored under a session key and in the message replaced by a reference to this session key: {sessionkey: +
elementToMoveSessionKey + } |
void |
setElementToMoveChain(String string)
Like
elementToMove but element is preceded with all ancestor elements and separated by semicolons (e.g. |
void |
setElementToMoveSessionKey(String string)
(Only used when
elementToMove is set) Name of the session key under which the character data is stored |
void |
setErrorSender(ISender errorSender)
Sender that will send the result in case the PipeLineExit state was not
SUCCESS . |
void |
setErrorStorage(ITransactionalStorage<Serializable> errorStorage)
Storage to keep track of messages that failed processing
|
void |
setForceRetryFlag(boolean b)
If set to
true , every message read will be processed as if it is being retried, by setting a session variable to "retry". |
void |
setHiddenInputSessionKeys(String string)
Comma separated list of keys of session variables which are available when the
PipelineSession is created and of which the value will not be shown in the log (replaced by asterisks) |
void |
setHideMethod(IMessageBrowser.HideMethod hideMethod)
Only used when hideRegex is not empty
|
void |
setHideRegex(String hideRegex)
Regular expression to mask strings in the errorStore/logStore.
|
void |
setInProcessStorage(ITransactionalStorage<Serializable> inProcessStorage)
Deprecated.
|
void |
setLabelNamespaceDefs(String labelNamespaceDefs)
Namespace defintions for labelXPath.
|
void |
setLabelStyleSheet(String string)
Stylesheet to extract label from message
|
void |
setLabelXPath(String string)
XPath expression to extract label from message
|
void |
setListener(IListener<M> newListener)
Sets the listener used to receive messages from.
|
void |
setListenerContainer(PullingListenerContainer<M> listenerContainer) |
void |
setMaxDeliveries(int i)
The maximum delivery count after which to stop processing the message (only for listeners that know the delivery count of received messages).
|
void |
setMaxRetries(int i)
The number of times a processing attempt is automatically retried after an exception is caught or rollback is experienced.
|
void |
setMessageLog(ITransactionalStorage<Serializable> messageLog)
Storage to keep track of all messages processed correctly
|
void |
setName(String newName)
Name of the Receiver as known to the Adapter
|
void |
setNumberOfExceptionsCaughtWithoutMessageBeingReceivedThreshold(int number)
Number of connection attemps to put the adapter in warning status
|
void |
setNumThreads(int newNumThreads)
The number of threads that may execute a Pipeline concurrently (only for pulling listeners)
|
void |
setNumThreadsPolling(int i)
The number of threads that are actively polling for messages concurrently.
|
void |
setOnError(Receiver.OnError value)
One of 'continue' or 'close'.
|
void |
setPollInterval(int i)
The number of seconds waited after an unsuccesful poll attempt before another poll attempt is made.
|
void |
setProcessResultCacheSize(int processResultCacheSize)
Size of the cache to keep process results, used by maxRetries
|
void |
setRemoveCompactMsgNamespaces(boolean b) |
void |
setReturnedSessionKeys(String string)
Deprecated.
|
void |
setRunState(RunState state)
Changes runstate.
|
void |
setSender(ISender sender)
Sender to which the response (output of
PipeLine ) should be sent. |
void |
setStartTimeout(int i)
timeout to start receiver.
|
void |
setStopTimeout(int i)
timeout to stopped receiver.
|
protected void |
startProcessingMessage(long waitingDuration) |
void |
startRunning()
Instruct the object that implements
IManagable to start working. |
void |
stopRunning()
Instruct the object that implements
IManagable to stop working. |
Map<ProcessState,Set<ProcessState>> |
targetProcessStates()
Provides the set of ProcessStates that a message in the specified state can be moved to, e.g.
|
protected void |
tellResourcesToStop()
must lead to a 'closeAllResources()' and runstate must be 'STOPPING'
if IPushingListener -> call closeAllResources()
if IPullingListener -> PullingListenerContainer has to call closeAllResources();
|
protected void |
throwEvent(String eventCode) |
String |
toString()
Returns a toString of this class by introspection and the toString() value of its listener.
|
protected void |
warn(String msg)
sends a warning to the log and to the messagekeeper of the adapter
|
configureTransactionAttributes, isTransacted, isTransacted, setTransacted, setTransactionTimeout
clone, equals, finalize, getClass, hashCode, notify, notifyAll, wait, wait, wait
getName
getApplicationContext, getName
getConfigurationClassLoader
setApplicationContext
getAdapter
getTransactionAttribute, getTransactionTimeout, getTxDef, setTransactionAttribute
public static final org.springframework.transaction.TransactionDefinition TXREQUIRED
public static final org.springframework.transaction.TransactionDefinition TXNEW_CTRL
public org.springframework.transaction.TransactionDefinition TXNEW_PROC
public static final String THREAD_CONTEXT_KEY_NAME
public static final String THREAD_CONTEXT_KEY_TYPE
public static final String RCV_CONFIGURED_MONITOR_EVENT
public static final String RCV_CONFIGURATIONEXCEPTION_MONITOR_EVENT
public static final String RCV_STARTED_RUNNING_MONITOR_EVENT
public static final String RCV_SHUTDOWN_MONITOR_EVENT
public static final String RCV_SUSPENDED_MONITOR_EVENT
public static final String RCV_RESUMED_MONITOR_EVENT
public static final String RCV_THREAD_EXIT_MONITOR_EVENT
public static final String RCV_MESSAGE_TO_ERRORSTORE_EVENT
public static final String RCV_MESSAGE_LOG_COMMENTS
public static final int RCV_SUSPENSION_MESSAGE_THRESHOLD
public static final int MAX_RETRY_INTERVAL
public static final String RETRY_FLAG_SESSION_KEY
protected RunStateManager runState
public boolean configurationSucceeded()
protected String getLogPrefix()
protected void info(String msg)
protected void warn(String msg)
protected void error(String msg, Throwable t)
protected void openAllResources() throws ListenerException, TimeoutException
ListenerException
TimeoutException
protected void tellResourcesToStop()
protected void closeAllResources()
protected void propagateName()
public void configure() throws ConfigurationException
IAdapter
to let the
receiver do things to initialize itself before the startListening
method is called.configure
in interface IConfigurable
configure
in class TransactionAttributes
ConfigurationException
- when initialization did not succeed.startRunning()
public void startRunning()
IManagable
IManagable
to start working.
The method does not wait for completion of the command; at return of this
method, the object might be still in the STARTING-runstatestartRunning
in interface IManagable
public void stopRunning()
IManagable
IManagable
to stop working.
The method does not wait for completion of the command; at return of this
method, the object might be still in the STOPPING-runstatestopRunning
in interface IManagable
public Set<ProcessState> knownProcessStates()
IHasProcessState
knownProcessStates
in interface IHasProcessState<Object>
public Map<ProcessState,Set<ProcessState>> targetProcessStates()
IHasProcessState
targetProcessStates
in interface IHasProcessState<Object>
public M changeProcessState(Object message, ProcessState toState, String reason) throws ListenerException
IHasProcessState
false
is returned.changeProcessState
in interface IHasProcessState<Object>
ListenerException
public IMessageBrowser<Object> getMessageBrowser(ProcessState state)
IProvidesMessageBrowsers
browser
of messages that are in ProcessState 'state', and are stored in a
storage managed by the listener itself (as opposed to a storage configured as a messageLog or errorStorage in the configuration).getMessageBrowser
in interface IProvidesMessageBrowsers<Object>
protected void startProcessingMessage(long waitingDuration)
protected void finishProcessingMessage(long processingDuration)
public void moveInProcessToError(String originalMessageId, String correlationId, ThrowingSupplier<Message,ListenerException> messageSupplier, Date receivedDate, String comments, Object rawMessage, org.springframework.transaction.TransactionDefinition txDef)
public Message processRequest(IListener<M> origin, M rawMessage, Message message, PipeLineSession session) throws ListenerException
processRequest(IListener, Object, Message, PipeLineSession)
.
A messageId is generated that is unique and consists of the name of this listener and a GUID
N.B. callers of this method should clear the remaining ThreadContext if its not to be returned to their callers.processRequest
in interface IMessageHandler<M>
ListenerException
public void processRawMessage(IListener<M> origin, M rawMessage) throws ListenerException
IMessageHandler
IMessageHandler.processRawMessage(IListener,Object,PipeLineSession, boolean)
, but now without context, for convenienceprocessRawMessage
in interface IMessageHandler<M>
ListenerException
public void processRawMessage(IListener<M> origin, M rawMessage, PipeLineSession session, boolean duplicatesAlreadyChecked) throws ListenerException
IMessageHandler
processRawMessage
in interface IMessageHandler<M>
ListenerException
public void processRawMessage(IListener<M> origin, M rawMessage, PipeLineSession session, long waitingDuration, boolean duplicatesAlreadyChecked) throws ListenerException
IMessageHandler
IMessageHandler.processRawMessage(IListener,Object,PipeLineSession, boolean)
, but now updates IdleStatistics tooprocessRawMessage
in interface IMessageHandler<M>
ListenerException
public void retryMessage(String storageKey) throws ListenerException
ListenerException
public void cacheProcessResult(String messageId, String errorMessage, Date receivedDate)
public boolean hasProblematicHistory(String messageId, boolean manualRetry, Object rawMessageOrWrapper, ThrowingSupplier<Message,ListenerException> messageSupplier, Map<String,Object> threadContext, String correlationId) throws ListenerException
ListenerException
public void exceptionThrown(INamedObject object, Throwable t)
IbisExceptionListener
t
occurred in object
.exceptionThrown
in interface IbisExceptionListener
public String getEventSourceName()
getEventSourceName
in interface EventThrowing
protected void registerEvent(String eventCode)
protected void throwEvent(String eventCode)
public void resetRetryInterval()
public void increaseRetryIntervalAndWait(Throwable t, String description)
public void iterateOverStatistics(StatisticsKeeperIterationHandler hski, Object data, HasStatistics.Action action) throws SenderException
iterateOverStatistics
in interface HasStatistics
SenderException
public boolean isThreadCountReadable()
isThreadCountReadable
in interface IThreadCountControllable
public boolean isThreadCountControllable()
isThreadCountControllable
in interface IThreadCountControllable
public int getCurrentThreadCount()
getCurrentThreadCount
in interface IThreadCountControllable
public int getMaxThreadCount()
getMaxThreadCount
in interface IThreadCountControllable
public void increaseThreadCount()
increaseThreadCount
in interface IThreadCountControllable
public void decreaseThreadCount()
decreaseThreadCount
in interface IThreadCountControllable
@Protected public void setRunState(RunState state)
public RunState getRunState()
runstate
of this receiver.getRunState
in interface IManagable
public boolean isInRunState(RunState someRunState)
public Message formatException(String extrainfo, String messageId, Message message, Throwable t)
IMessageHandler
formatException
in interface IMessageHandler<M>
public org.springframework.beans.factory.BeanFactory getBeanFactory()
public void setBeanFactory(org.springframework.beans.factory.BeanFactory beanFactory)
setBeanFactory
in interface org.springframework.beans.factory.BeanFactoryAware
public PullingListenerContainer<M> getListenerContainer()
public void setListenerContainer(PullingListenerContainer<M> listenerContainer)
public PullingListenerContainer<M> createListenerContainer()
protected StatisticsKeeper getProcessStatistics(int threadsProcessing)
protected StatisticsKeeper getIdleStatistics(int threadsProcessing)
public Iterable<StatisticsKeeper> getProcessStatistics()
getProcessStatistics
in interface IReceiverStatistics
public Iterable<StatisticsKeeper> getIdleStatistics()
getIdleStatistics
in interface IReceiverStatistics
public Iterable<StatisticsKeeper> getQueueingStatistics()
public boolean isOnErrorContinue()
public long getMessagesReceived()
public long getMessagesRetried()
public long getMessagesRejected()
public long getLastMessageDate()
public void resetNumberOfExceptionsCaughtWithoutMessageBeingReceived()
public String toString()
public void setListener(IListener<M> newListener)
public void setSender(ISender sender)
PipeLine
) should be sent. Applies if the receiver
has an asynchronous listener.
N.B. Sending correlated responses via this sender is not supported.@Deprecated @ConfigurationWarning(value="In-Process Storage no longer exists") public void setInProcessStorage(ITransactionalStorage<Serializable> inProcessStorage)
inProcessStorage
- The inProcessStorage to setpublic void setErrorSender(ISender errorSender)
SUCCESS
.
Applies if the receiver has an asynchronous listener.public void setErrorStorage(ITransactionalStorage<Serializable> errorStorage)
public void setMessageLog(ITransactionalStorage<Serializable> messageLog)
public void setName(String newName)
setName
in interface INamedObject
public void setOnError(Receiver.OnError value)
public void setNumThreads(int newNumThreads)
public void setNumThreadsPolling(int i)
numthreads
' (only for pulling listeners)public void setPollInterval(int i)
public void setStartTimeout(int i)
public void setStopTimeout(int i)
public void setCheckForDuplicates(boolean b)
true
, each message is checked for presence in the messageLog. If already present, it is not processed again. Only required for non XA compatible messaging. Requires messageLog!public void setCheckForDuplicatesMethod(Receiver.CheckForDuplicatesMethod method)
checkForDuplicates=true
) Indicates whether the messageid or the correlationid is used for checking presence in the message logpublic void setMaxDeliveries(int i)
public void setMaxRetries(int i)
maxRetries < 0
the number of attempts is infinitepublic void setProcessResultCacheSize(int processResultCacheSize)
@Deprecated @ConfigurationWarning(value="attribute is no longer used. Please use attribute returnedSessionKeys of the JavaListener if the set of sessionsKeys that can be returned to callers session must be limited.") public void setReturnedSessionKeys(String string)
public void setCorrelationIDXPath(String string)
public void setCorrelationIDNamespaceDefs(String correlationIDNamespaceDefs)
prefix=namespaceuri
-definitionspublic void setCorrelationIDStyleSheet(String string)
public void setLabelXPath(String string)
public void setLabelNamespaceDefs(String labelNamespaceDefs)
prefix=namespaceuri
-definitionspublic void setLabelStyleSheet(String string)
public void setChompCharSize(String string)
public void setElementToMove(String string)
elementToMoveSessionKey
+ }public void setElementToMoveSessionKey(String string)
elementToMove
is set) Name of the session key under which the character data is storedpublic void setElementToMoveChain(String string)
elementToMove
but element is preceded with all ancestor elements and separated by semicolons (e.g. adapter;pipeline;pipe)public void setRemoveCompactMsgNamespaces(boolean b)
public void setHideRegex(String hideRegex)
public void setHideMethod(IMessageBrowser.HideMethod hideMethod)
public void setHiddenInputSessionKeys(String string)
PipelineSession
is created and of which the value will not be shown in the log (replaced by asterisks)public void setForceRetryFlag(boolean b)
true
, every message read will be processed as if it is being retried, by setting a session variable to "retry".public void setNumberOfExceptionsCaughtWithoutMessageBeingReceivedThreshold(int number)
Copyright © 2023 Frank!Framework. All rights reserved.