Class PipeLine

All Implemented Interfaces:
ICacheEnabled<String,String>, HasTransactionAttribute, IConfigurationAware, IScopeProvider, HasStatistics, org.springframework.beans.factory.Aware, org.springframework.context.ApplicationContextAware

@Category("Basic") public class PipeLine extends TransactionAttributes implements ICacheEnabled<String,String>, HasStatistics, IConfigurationAware
Required in each Adapter to transform incoming messages. A pipeline is a sequence of pipes. A pipeline also defines its allowed end states using the <Exits> tag.

The pipes in a PipeLine may not be executed in sequential order, see PipeForward.

A pipeline gathers statistics about the messages it processes.

In the AppConstants there may be a property named log.logIntermediaryResults (true/false) which indicates whether the intermediary results (between calling pipes) have to be logged.

Transaction control

THE FOLLOWING TO BE UPDATED, attribute 'transacted' replaced by 'transactionAttribute' If transacted is set to true, messages will be processed under transaction control. Processing by XA-compliant pipes (i.e. Pipes that implement the IXAEnabled-interface, set their transacted-attribute to true and use XA-compliant resources) will then either be committed or rolled back in one transaction. If transacted is set to true, either an existing transaction (started by a transactional receiver) is joined, or new one is created (if the message processing request is not initiated by a receiver under transaction control. Messages are only committed or rolled back by the Pipeline if it started the transaction itself. If the pipeline joined an existing transaction, the commit or rollback is left to the object that started the transaction, i.e. the receiver. In the latter case the pipeline can indicate to the receiver that the transaction should be rolled back (by calling UserTransaction.setRollBackOnly()). The choice whether to either commit (by Pipeline or Receiver) or rollback (by Pipeline or Receiver) is made as follows: If the processing of the message concluded without exceptions and the status of the transaction is STATUS_ACTIVE (i.e. normal) the transaction will be committed. Otherwise it will be rolled back, or marked for roll back by the calling party.
Author:
Johan Verrips
  • Field Details

  • Constructor Details

    • PipeLine

      public PipeLine()
  • Method Details

    • getPipe

      public IPipe getPipe(String pipeName)
    • getPipe

      public IPipe getPipe(int index)
    • registerExitHandler

      public void registerExitHandler(IPipeLineExitHandler exitHandler)
    • configure

      public void configure() throws ConfigurationException
      Configures the pipes of this Pipeline and does some basic checks. It also registers the PipeLineSession object at the pipes.
      Overrides:
      configure in class TransactionAttributes
      Throws:
      ConfigurationException
      See Also:
    • configure

      public void configure(IPipe pipe) throws ConfigurationException
      Throws:
      ConfigurationException
    • configurationSucceeded

      public boolean configurationSucceeded()
    • findExitByState

      public Optional<PipeLineExit> findExitByState(PipeLine.ExitState state)
    • getPipeLineSize

      public int getPipeLineSize()
      Returns:
      the number of pipes in the pipeline
    • getPipeStatistics

      @Nonnull public io.micrometer.core.instrument.DistributionSummary getPipeStatistics(IConfigurationAware pipe)
    • getPipeWaitStatistics

      @Nonnull public io.micrometer.core.instrument.DistributionSummary getPipeWaitStatistics(IPipe pipe)
    • getPipeSizeInStatistics

      @Nonnull public io.micrometer.core.instrument.DistributionSummary getPipeSizeInStatistics(IPipe pipe)
    • getPipeSizeOutStatistics

      @Nonnull public io.micrometer.core.instrument.DistributionSummary getPipeSizeOutStatistics(IPipe pipe)
    • process

      public PipeLineResult process(String messageId, Message message, PipeLineSession pipeLineSession) throws PipeRunException
      The process method does the processing of a message.
      It retrieves the first pipe to execute from the firstPipe</code field, the call results in a PipRunResult, containing the next pipe to activate. While processing the process method keeps statistics.
      Parameters:
      message - The message as received from the Listener
      messageId - A unique id for this message, used for logging purposes.
      Returns:
      the result of the processing.
      Throws:
      PipeRunException - when something went wrong in the pipes.
    • resolveForward

      public IForwardTarget resolveForward(IPipe pipe, PipeForward forward) throws PipeRunException
      Find the destination of the forward, i.e. the object (Pipe or PipeLineExit) where the forward points to.
      Throws:
      PipeRunException
    • setAdapter

      public void setAdapter(Adapter adapter)
      Register the adapterName of this Pipelineprocessor.
      Parameters:
      adapter -
    • start

      public void start() throws PipeStartException
      Throws:
      PipeStartException
    • startPipe

      protected void startPipe(String type, IPipe pipe) throws PipeStartException
      Throws:
      PipeStartException
    • stop

      public void stop()
      Close the pipeline. This will call the stop() method of all registered Pipes
      See Also:
    • stopPipe

      protected void stopPipe(String type, IPipe pipe)
    • getName

      public String getName()
      Specified by:
      getName in interface IConfigurationAware
    • toString

      public String toString()
      Overrides:
      toString in class Object
      Returns:
      an enumeration of all pipenames in the pipeline and the startpipe and endpath
      See Also:
    • setInputValidator

      public void setInputValidator(IValidator inputValidator)
      Request validator, or combined validator for request and response
    • setOutputValidator

      public void setOutputValidator(IValidator outputValidator)
      Optional pipe to validate the response. Can be specified if the response cannot be validated by the request validator
    • setInputWrapper

      public void setInputWrapper(IWrapperPipe inputWrapper)
      Optional pipe to extract the request message from its envelope
    • setOutputWrapper

      public void setOutputWrapper(IWrapperPipe outputWrapper)
      Optional pipe to wrap the response message in an envelope
    • setPipeLineExits

      public void setPipeLineExits(PipeLineExits exits)
      PipeLine exits. If no exits are specified, a default one is created with name="READY" and state="SUCCESS"
    • registerPipeLineExit

      @Deprecated public void registerPipeLineExit(PipeLineExit exit)
      Deprecated.
      PipeLine exits.
    • setGlobalForwards

      public void setGlobalForwards(PipeForwards forwards)
      Optional global forwards that will be added to every pipe, when the forward name has not been explicitly set. For example the <forward name="exception" path="error_exception" />, which will add the 'exception' forward to every pipe in the pipeline.
    • registerForward

      @Deprecated public void registerForward(PipeForward forward)
      Deprecated.
    • setLocker

      public void setLocker(Locker locker)
      Optional Locker, to avoid parallel execution of the PipeLine by multiple threads on multiple servers. The Pipeline is NOT executed (and is considered to have ended successfully) when the lock cannot be obtained, e.g. in case another thread, may be in another server, holds the lock and does not release it in a timely manner. If only the number of threads executing this PipeLine needs to be limited, the attribute maxThreads can be set instead, avoiding the database overhead.
    • setCache

      public void setCache(ICache<String,String> cache)
      Cache of results
      Specified by:
      setCache in interface ICacheEnabled<String,String>
    • addPipe

      public void addPipe(IPipe pipe) throws ConfigurationException
      Register a Pipe at this pipeline. The name is also put in the globalForwards table (with forward-name=pipename and forward-path=pipename, so that pipe can look for a specific pipe-name. If already a globalForward exists under that name, the pipe is NOT added, allowing globalForwards to prevail.
      Throws:
      ConfigurationException
      See Also:
    • setFirstPipe

      public void setFirstPipe(String pipeName)
      Name of the first pipe to execute when a message is to be processed
      Default value
      first pipe of the pipeline
    • setMaxThreads

      public void setMaxThreads(int newMaxThreads)
      Maximum number of threads that may execute this Pipeline simultaneously, use 0 to disable limit
      Default value
      0
    • setStoreOriginalMessageWithoutNamespaces

      public void setStoreOriginalMessageWithoutNamespaces(boolean b)
      If set true the original message without namespaces (and prefixes) is stored under the session key originalMessageWithoutNamespaces
      Default value
      false
    • setMessageSizeWarn

      public void setMessageSizeWarn(String s)
      If messageSizeWarn>=0 and the size of the input or result pipe message exceeds the value specified a warning message is logged. You can specify the value with the suffixes KB, MB or GB
      Default value
      application default (30MB)
    • getMessageSizeWarnNum

      public long getMessageSizeWarnNum()
    • setTransformNullMessage

      public void setTransformNullMessage(String s)
      when specified and null is received as a message the message is changed to the specified value
    • setAdapterToRunBeforeOnEmptyInput

      @Deprecated @ConfigurationWarning("Please use an XmlIf-pipe and call a sub-adapter to retrieve a new/different response") public void setAdapterToRunBeforeOnEmptyInput(String s)
      Deprecated.
      when specified and an empty message is received the specified adapter is run before passing the message (response from specified adapter) to the pipeline
    • getApplicationContext

      public org.springframework.context.ApplicationContext getApplicationContext()
      Specified by:
      getApplicationContext in interface IConfigurationAware
    • setApplicationContext

      public void setApplicationContext(org.springframework.context.ApplicationContext applicationContext)
      Specified by:
      setApplicationContext in interface org.springframework.context.ApplicationContextAware
    • getConfigurationClassLoader

      public ClassLoader getConfigurationClassLoader()
      Description copied from interface: IScopeProvider
      This ClassLoader is set upon creation of the object, used to retrieve resources configured by the Ibis application.
      Specified by:
      getConfigurationClassLoader in interface IScopeProvider
      Returns:
      returns the ClassLoader created by the ClassLoaderManager.
    • setConfigurationMetrics

      public void setConfigurationMetrics(MetricsInitializer configurationMetrics)
    • getFirstPipe

      public String getFirstPipe()
    • getMaxThreads

      public int getMaxThreads()
    • isStoreOriginalMessageWithoutNamespaces

      public boolean isStoreOriginalMessageWithoutNamespaces()
    • getAdapterToRunBeforeOnEmptyInput

      public String getAdapterToRunBeforeOnEmptyInput()
    • getInputValidator

      public IValidator getInputValidator()
    • getOutputValidator

      public IValidator getOutputValidator()
    • getInputWrapper

      public IWrapperPipe getInputWrapper()
    • getOutputWrapper

      public IWrapperPipe getOutputWrapper()
    • getPipeLineExits

      public Map<String,PipeLineExit> getPipeLineExits()
    • getGlobalForwards

      public Map<String,PipeForward> getGlobalForwards()
    • getLocker

      public Locker getLocker()
    • getCache

      public ICache<String,String> getCache()
      Specified by:
      getCache in interface ICacheEnabled<String,String>
    • getPipes

      public List<IPipe> getPipes()
    • getAdapter

      public Adapter getAdapter()
      Specified by:
      getAdapter in interface HasStatistics
    • setOwner

      public void setOwner(INamedObject owner)
    • getOwner

      public INamedObject getOwner()
    • setPipeLineProcessor

      public void setPipeLineProcessor(PipeLineProcessor pipeLineProcessor)
    • getRequestSizeStats

      public io.micrometer.core.instrument.DistributionSummary getRequestSizeStats()
    • getPipelineWaitStatistics

      public io.micrometer.core.instrument.DistributionSummary getPipelineWaitStatistics()
    • getExitHandlers

      public List<IPipeLineExitHandler> getExitHandlers()