Package org.javalite.async
Class Async
java.lang.Object
org.javalite.async.Async
Wrapper for embedded Apache ActiveMQ Artemis. It is an embedded in-memory
JMS server for asynchronous processing. JavaLite Async can be used in standalone applications,
but specifically useful in web apps for processing asynchronous jobs without delaying
rendering web responses.
It sets many configuration parameters of Artemis
EmbeddedActiveMQ
to
sensible values so you do not have to.
This class also implements a Command Pattern for ease of writing asynchronous code.- Author:
- Igor Polevoy on 3/4/15.
-
Nested Class Summary
-
Constructor Summary
ConstructorDescriptionAsync(String dataDirectory, boolean useLibAio, com.google.inject.Injector injector, QueueConfig... queueConfigs)
Creates and configures a new instance.Async(String dataDirectory, boolean useLibAio, QueueConfig... queueConfigs)
Creates and configures a new instance. -
Method Summary
Modifier and TypeMethodDescriptionvoid
configureNetty(String host, int port)
Call this method once after a constructor in order to create a Netty instance to accept out of VM messages.getBatchReceiver(String queueName, long timeout)
Generate aBatchReceiver
to receive and process stored messages.static byte[]
getBytes(javax.jms.BytesMessage message)
org.apache.activemq.artemis.core.config.Configuration
Get additional server configuration.long
getMessageCount(String queue)
Returns number of messages currently in queueReturns counts of messages for all queues.getTopCommands(int count, String queueName)
Returns top commands in queue.getTopTextMessages(int maxSize, String queueName)
Returns topTextMessage
s in queue.boolean
boolean
moveMessage(String jmsMessageId, String source, String target)
Moves a message from one queue to anotherint
moveMessages(String source, String target)
Moves all messages from one queue to anothervoid
Pauses a queue.receiveCommand(String queueName)
Receives a command from a queue synchronously.<T extends Command>
TreceiveCommand(String queueName, int timeout, Class<T> type)
Receives a command from a queue synchronously.receiveCommand(String queueName, long timeout)
Receives a command from a queue synchronously.<T extends Command>
TreceiveCommand(String queueName, Class<T> type)
Receives a command from a queue synchronously.javax.jms.Message
receiveMessage(String queueName, long timeout)
Receives a messafge from a queue asynchronously.If this queue also has listeners, then messages will be distributed across all consumers.int
removeAllMessages(String queueName)
Removes all messages from queue.int
removeMessages(String queueName, String filter)
Removes messages from queue.void
Resumes a paused queuevoid
Sends a command into a queue for processingvoid
Sends a command into a queue for processingvoid
Sends a command into a queue for processingvoid
send(String queueName, Command command, int deliveryMode, int priority, long timeToLive, long deliveryTime)
Sends a command into a queue for processingvoid
Sends a command into a queue for processingvoid
Sends a command into a queue for processingvoid
sendTextMessage(String queueName, String text)
Sends a non-expiringTextMessage
with average priority.void
sendTextMessage(String queueName, String text, int deliveryMode, int priority, long timeToLive)
Sends aTextMessage
.void
sendTextMessage(String queueName, String text, int deliveryMode, int priority, long timeToLive, long deliveryTime)
Sends aTextMessage
.void
sendTextMessage(String queueName, String text, long deliveryTime)
Sends a non-expiringTextMessage
with average priority.void
setBinaryMode(boolean binaryMode)
If true, uses binary mode to send messages.void
setTopicConfigsList(List<TopicConfig> topicConfigsList)
void
start()
Starts the server.void
stop()
Stops this JMS server.
-
Constructor Details
-
Async
Creates and configures a new instance. However, it is recommended to use a builder to create Async instances:new Async.AsyncBuilder(...)...
.- Parameters:
dataDirectory
- root directory where persistent messages are storeduseLibAio
- true to use libaio, false not to use (See Artemis log statements to check if it was detected).queueConfigs
- vararg of QueueConfig instances.
-
Async
public Async(String dataDirectory, boolean useLibAio, com.google.inject.Injector injector, QueueConfig... queueConfigs)Creates and configures a new instance. However, it is recommended to use a builder to create Async instances:new Async.AsyncBuilder(...)...
.- Parameters:
dataDirectory
- root directory where persistent messages are storeduseLibAio
- true to use libaio, false to use NIO.injector
- Google Guice injector. Used to inject dependency members into commands if needed.queueConfigs
- vararg of QueueConfig> instances.
-
-
Method Details
-
start
public void start()Starts the server. -
stop
public void stop()Stops this JMS server. -
setBinaryMode
public void setBinaryMode(boolean binaryMode)If true, uses binary mode to send messages. If set to false (default), will send messages as strings. Test which method is faster in your environment for your CPU and IO performance. generally, binary mode will use a lot less IO, but more CPU and vice versa.- Parameters:
binaryMode
- true to send messages in binary mode, false to send as strings.
-
configureNetty
Call this method once after a constructor in order to create a Netty instance to accept out of VM messages.- Parameters:
host
- host to bind toport
- port to listen on
-
send
Sends a command into a queue for processing- Parameters:
queueName
- name of queuecommand
- command instance.
-
send
Sends a command into a queue for processing- Parameters:
queueName
- name of queuecommand
- command instance.deliveryTime
- delivery time in the future. Ifnull
, or in the past, the message will be delivered as usual.
-
send
Sends a command into a queue for processing- Parameters:
queueName
- name of queuecommand
- command to processdeliveryMode
- delivery mode:DeliveryMode
.
-
send
Sends a command into a queue for processing- Parameters:
queueName
- name of queuecommand
- command to processdeliveryMode
- delivery mode:DeliveryMode
.deliveryTime
- delivery time in milliseconds
-
send
public void send(String queueName, Command command, int deliveryMode, int priority, long timeToLive)Sends a command into a queue for processing- Parameters:
queueName
- name of queuecommand
- command to processdeliveryMode
- delivery mode:DeliveryMode
.priority
- priority of the message. Correct values are from 0 to 9, with higher number denoting a higher priority.timeToLive
- the message's lifetime (in milliseconds, where 0 is to never expire)
-
send
public void send(String queueName, Command command, int deliveryMode, int priority, long timeToLive, long deliveryTime)Sends a command into a queue for processing- Parameters:
queueName
- name of queuecommand
- command to processdeliveryMode
- delivery mode:DeliveryMode
. 1 for non-persistent, 2 for persistent.priority
- priority of the message. Correct values are from 0 to 9, with higher number denoting a higher priority.timeToLive
- the message's lifetime (in milliseconds, where 0 is to never expire)deliveryTime
- The specified value must be a positive long corresponding to the time the message must be delivered (in milliseconds). For instance,System.currentTimeMillis() + 5000
would be 5 seconds from now.
-
receiveCommand
Receives a command from a queue synchronously. If this queue also has listeners, then commands will be distributed across all consumers.- Parameters:
queueName
- name of queue- Returns:
- command if found. If command not found, this method will block till a command is present in queue.
see
receiveCommand(String, long)
-
receiveCommand
Receives a command from a queue synchronously. If this queue also has listeners, then commands will be distributed across all consumers. This method will block until a command becomes available for this consumer.- Parameters:
queueName
- name of queuetype
- expected class of a command- Returns:
- command if found. If command not found, this method will block till a command is present in queue.
see
receiveCommand(String, long)
-
receiveCommand
Receives a command from a queue synchronously. If this queue also has listeners, then commands will be distributed across all consumers.- Parameters:
queueName
- name of queuetimeout
- timeout in milliseconds. If a command is not received during a timeout, this methods returns null.type
- expected class of a command- Returns:
- command if found. If command not found, this method will block till a command is present in queue.
see
receiveCommand(String, long)
-
receiveCommand
Receives a command from a queue synchronously. If this queue also has listeners, then commands will be distributed across all consumers.- Parameters:
queueName
- name of queuetimeout
- timeout in milliseconds. If a command is not received during a timeout, this methods returns null.- Returns:
- command if found. If command not found, this method will block till a command is present in queue or a timeout expires.
-
receiveMessage
Receives a messafge from a queue asynchronously.If this queue also has listeners, then messages will be distributed across all consumers.- Parameters:
queueName
- name of queuetimeout
- timeout in millis.- Returns:
- message if found, null if not.
-
sendTextMessage
Sends a non-expiringTextMessage
with average priority.- Parameters:
queueName
- name of queuetext
- body of message
-
sendTextMessage
Sends a non-expiringTextMessage
with average priority.- Parameters:
queueName
- name of queuetext
- body of messagedeliveryTime
- The specified value must be a positive long corresponding to the time the message must be delivered (in milliseconds). For instance,System.currentTimeMillis() + 5000
would be 5 seconds from now.
-
sendTextMessage
public void sendTextMessage(String queueName, String text, int deliveryMode, int priority, long timeToLive)Sends aTextMessage
.- Parameters:
queueName
- name of queuetext
- body of messagedeliveryMode
- delivery mode:DeliveryMode
.priority
- priority of the message. Correct values are from 0 to 9, with higher number denoting a higher priority.timeToLive
- the message's lifetime (in milliseconds, where 0 is to never expire)
-
sendTextMessage
public void sendTextMessage(String queueName, String text, int deliveryMode, int priority, long timeToLive, long deliveryTime)Sends aTextMessage
.- Parameters:
queueName
- name of queuetext
- body of messagedeliveryMode
- delivery mode:DeliveryMode
.priority
- priority of the message. Correct values are from 0 to 9, with higher number denoting a higher priority.timeToLive
- the message's lifetime (in milliseconds, where 0 is to never expire)deliveryTime
- The specified value must be a positive long corresponding to the time the message must be delivered (in milliseconds). For instance,System.currentTimeMillis() + 5000
would be 5 seconds from now.
-
getBatchReceiver
Generate aBatchReceiver
to receive and process stored messages. This method ALWAYS works in the context of a transaction.- Parameters:
queueName
- name of queuetimeout
- timeout to wait.- Returns:
- instance of
BatchReceiver
.
-
getTopCommands
Returns top commands in queue. Does not remove anything from queue. This method can be used for an admin tool to peek inside the queue.- Parameters:
count
- number of commands to lookup.- Returns:
- top commands in queue or empty list is nothing is found in queue.
-
getTopTextMessages
Returns topTextMessage
s in queue. Does not remove anything from queue. This method can be used for an admin tool to peek inside the queue.- Parameters:
maxSize
- max number of messages to lookup.- Returns:
- top commands in queue or empty list is nothing is found in queue.
-
getBytes
public static byte[] getBytes(javax.jms.BytesMessage message) throws javax.jms.JMSException- Throws:
javax.jms.JMSException
-
getMessageCounts
Returns counts of messages for all queues.- Returns:
- map, where a key is a queue name, and value is a number of messages currently in that queue.0
-
getMessageCount
Returns number of messages currently in queue- Parameters:
queue
- queue name- Returns:
- number of messages currently in queue
-
resume
Resumes a paused queue- Parameters:
queueName
- queue name
-
pause
Pauses a queue. A paused queue stops delivering commands to listeners. It still can accumulate commands.- Parameters:
queueName
- queue name.
-
isPaused
- Parameters:
queueName
- queue name- Returns:
- true if queue is paused, false if not.
-
removeMessages
Removes messages from queue.- Parameters:
queueName
- queue namefilter
- filter selector as in JMS specification. See: JMS Message Selectors- Returns:
- number of messages removed
-
removeAllMessages
Removes all messages from queue.- Parameters:
queueName
- queue name.- Returns:
- number of messages removed
-
moveMessages
Moves all messages from one queue to another- Parameters:
source
- name of source queuetarget
- name of target queue- Returns:
- number of messages moved
-
moveMessage
Moves a message from one queue to another- Parameters:
jmsMessageId
- JMS message id of a message to movesource
- name of source queuetarget
- name of target queue- Returns:
- true if message moved
-
setTopicConfigsList
-
getConfig
public org.apache.activemq.artemis.core.config.Configuration getConfig()Get additional server configuration.
-