Hi, I have a scenario which I believe can be solved in many different ways. I am currently focusing on SEDA components for that but not really getting what I want. The scenario consists of distributing load among nodes based on nodes' availability. Nodes (for the sake of simplicity of this example they are 2) will pick up messages from a queue and process them. If nothing in the queue then the nodes will simply sleep. If both nodes are sleeping when a message arrives, then only one node should be a able to pick up the message. If both nodes are busy then the messages will simply queue up. Processing of a message by a node can take an arbitrary amount of time (depending on the individual content of a message). It is possible a node may potentially pick up and process 2 (or more) light messages from the queue, in a row, if the other node is busy on a fat message meanwhile. I believe that can be done with JMS without much of a hassle but I would prefer to do it without JMS get involved and I am trying SEDA for that. However I am not able to do it with cSEDA (or with cVM as in the attachment). I am trying different combinations of configuration to cSEDA or cVM components but either it does not work at all or it works sequentially, i.e. processing of the fat message blocks the other thread as well. Please tell me if the above scenario is possible in SEDA style or if I am barking at the wrong tree. The only example in Talend manual I could have found is connecting cSEDAs (or cVMs) one-to-one. Cheers.
Your example looks like a perfect fit for the cLoadBalancer ... just replace the cVM_1 and cVM_2 & cVM_3 with one cLoadBalancer component and then you get the distribution e.g. by round robin to you services behind. Would this fit to your use case? Dietmar
Hi Dietmar, I have tried the load balancer but it is not doing what I wanted. Or to be precise it does it only partially. LB distributes messages synchronously. That is two endpoints don't process messages in parallel. Once LB hands over to one processor it waits it until it returns. Then it calls the other endpoint. I want them to work asynchronously on different threads. When one of them is working on one large message the other one should be able to pick up small ones. Maybe with a custom load balancer it is possible but then this is defeating the purpose of using a product with ready made components.
Hi laksu, Here are a couple options that may help you move forward on this with cLoadBalancer. For the first option, you could enable async processing in the route by declaring a thread pool. Just add a cJavaDSLProcessor near the beginning and add the amount of threads using the '.threads()' method instead of '.to()' (e.g. .threads(20)). A second option is to set up SEDA endpoints after the cLoadBalancer to create that async behavior with configurable capacity at each endpoint.
It sounds like you want to balance dynamically based on the load at a given processing endpoint. If this is what is required, I suggest looking into the Custom load balancing strategy. The Camel load balancer documentation listed below shows how to implement. You should be able to do this in a Bean on the left in your Studio. http://camel.apache.org/async.html http://camel.apache.org/load-balancer.html I hope that helps. Best Regards, Ben
Another option would be to approach it like a QoS problem and use cLoadBalancer with the Sticky strategy. Then use a property from the message to stick messages of different sizes to different endpoints. You could use some value that indicates type of file, or also drive it from the size of the file content.
Hi Ben, I went through your suggestions and I think I learned a lot on the way about Camel and Talend but neither of them really is the solution I am looking after. That is because of how the load-balancer is designed I believe. Once the load balancer decides which branch to follow it calls the branch synchronously. That is it does wait for it to return and then picks up another message from the queue and redirects it to the other branch. Having threads(2) is not helping as it ends up calling the same branch in a second thread. That is not what I want, because branches lead to different servers and if I submit 2 jobs to one server, it will accept but then time-share between those two concurrently submitted jobs. I want one server to be dedicated to one job at one given time.
In the route, on the left column, I set a header "job" to a sequential number from 1 to 6 and then I set the body to an integer standing in for the size of the job. All jobs have a body of size 1 except the first which is of size 9. On the right is a seda input which is then multi threaded. The load balancer is a round-robin assigning jobs to processors equally. Here the equality is based on the number of jobs, not based on their content. Processors simply: - extract the header and body - print starting message - sleeps number of seconds as specified in the body - print ending message The following is output: Processor A; Starting job:; time to sleep:9 seconds Processor B; Starting job:; time to sleep:1 seconds Processor B; Ending job:; time to sleep:1 seconds Processor A; Starting job:; time to sleep:1 seconds Processor A; Ending job:; time to sleep:1 seconds Processor B; Starting job:; time to sleep:1 seconds Processor B; Ending job:; time to sleep:1 seconds Processor A; Starting job:; time to sleep:1 seconds Processor A; Ending job:; time to sleep:1 seconds Processor B; Starting job:; time to sleep:1 seconds Processor B; Ending job:; time to sleep:1 seconds Processor A; Ending job:; time to sleep:9 seconds Because of the second thread, it submits job 3 and job 5 to ProcessorA while it is still busy with job 1. What I want in this scenario is that it should use ProcessorA for job 1 and then in the meanwhile ProcessorB should be able to handle the remaining 5. Having load balancer before seda endpoints does not work again. This is because it splits the work between the two branches equally in terms of count of jobs, in my scenario 3 jobs per branch. The custom load balancer is not a good solution either. It is complicated if nothing else. I would have to keep track of which server is busy/idle in some extra variables, or poll the servers to see what they are busy with. The Sticky strategy is not an option because in that case, I would have to estimate how much a job should take up before submitting to the servers. That involves performance assessment of each server which would make things even more complicated. What feels like working is getting rid of the cLoadBalancer and configuring seda endpoint to use concurrentThreads=2. This is what I exactly want: Two threads sleep at the tail of the queue. Each grab a job exclusively (only one can grab the next job) when they are both busy with their jobs, new jobs queue up. However, with concurrentThreads, there is only one branch. This way, I must be able to tell which thread of seda endpoint I am on and at the end of the branch, redirect to different servers. Any further ideas?
Hi lasku, In the example you provided, it may help to replace the cLoadBalancer component with a cMessageRouter instead - since you're already providing concurrency through a thread pool. Right click the cMessageRouter and select 'Trigger -> When' to connect to your processors.
HOWEVER! With that being said, I'm starting to think you're simply looking for non-persistent JMS behavior without maintaining a server (like you alluded to in your first post). You can actually run a non-persistent ActiveMQ broker embedded within the route itself. If you grab a cJMSConnectionFactory component and drop it on the canvas, the default configuration will actually provide exactly this - automatically. Simply configure consumers to receive one message at a time from the broker with the prefetch limit (see below). This is an ideal fit because JMS provides the load balancing semantics I think you're looking for by the specification design. You can set the prefetch limit on the connection string by adding &jms.prefetchPolicy.queuePrefetch=1 http://activemq.apache.org/what-is-the-prefetch-limit-for.html If you were to use ActiveMQ, the left side of your attached design would produce messages through cJMS. Then each of the Processor-* components on the right would start with a cJMS component (and nothing else). Much simpler! Note: For ActiveMQ connections, I recommend checking 'Use Pooled Connection Factory' on cJMSConnectionFactory. More about the vm transport for embedded broker: http://activemq.apache.org/vm-transport-reference.html I'm interested to see if that works for you. Best Regards, Ben
To sum up what I understand: - SEDA is not a full replacement for JMS. - LoadBalancer works synchronously, which severely limits its use cases. I will check the JMS option and see how it goes. Many thanks, Levent
Hi I have checked the JMS/ActiveMQ path with prefetch=1 . Unfortunately the default configuration with "vm: ..." did not work for me as it is complaining about a temporary disk space configuration. Leaving that aside to return, I installed ActiveMQ and went through tcp based queuing with prefetch set to 1: Broker URI: "tcp://localhost:61616?jms.prefetchPolicy.queuePrefetch=1" However some very interesting phenomenon occurs: I have two consumers (Processors) as previously. 6 jobs are queued in succession. Only the first message contains 12 while the rest of the messages contain 1. A given processor parses the message content and sleeps for the number of seconds the message contains. I am expecting if 1st message is picked by Processor A then the rest of the messages should be consumed by processor B before processor A can finish the message 1. Please see the output: 11:58:32:821: Processor A; Starting job=; seconds:12 11:58:32:823: Processor B; Starting job=; seconds: 1 11:58:33:824: Processor B; Ending job=; seconds: 1 11:58:33:845: Processor B; Starting job=; seconds: 1 11:58:34:845: Processor B; Ending job=; seconds: 1 11:58:34:849: Processor B; Starting job=; seconds: 1 11:58:35:850: Processor B; Ending job=; seconds: 1 11:58:35:852: Processor B; Starting job=; seconds: 1 11:58:36:852: Processor B; Ending job=; seconds: 1 11:58:44:823: Processor A; Ending job=; seconds:12 11:58:44:824: Processor A; Starting job=; seconds: 1 11:58:45:825: Processor A; Ending job=; seconds: 1 This looks weird to me as the job(message) 3 gets stuck at somewhere but it does not get released before processor A finishes with message 1. Please note the time difference before the line that reads Processor A, ending job 1. I cannot replicate now but I think I observed once that the message 3 is consumed by Processor B , that is all same with the output above but last two lines are Processor B instead of Processor A. I suspect there must be some other configuration somewhere to ensure only one message is consumed by one consumer at one given time. Best regards, Levent
Hi Levent, Do you have two separate routes defined, each to listen on the JMS queue, or just one with a message router like before? With this configuration, a single message will be processed at each consumer. So the broker won't dispatch another message to your consumer until the first is finished processing. If you're using just one route, I suggest trying the concurrentConsumers option (&concurrentConsumers=3). If you're looking at the ActiveMQ web admin or JMX MBeans, you should see the number of consumers on the queue match what you configure. It sounds like right now it's just 1 consumer. Best Regards, Ben
Hi Ben, I am using two routes. (Please see the image.) The ultimate intention is distributing the load across servers sitting on different pieces of hardware. ConcurrentConsumers option existed in seda as well. The mechanism distributes the load as I liked but in both the problem with concurrent consumers approach is that further down the route I cannot tell which consumer I am in, and therefore I cannot dispatch the job to the hardware I wanted. Hope I am making sense. Best regards, Levent
You could also set up two separate route 'jobs' in Studio. Meaning you'd go right click and select 'New Route'. Then set up your second consumer there and run both of them at the same time. The reason for my last 2 suggestions is that I think thread concurrency is restricted for routes running in Studio, but I'm not positive. Splitting into two should take care of that.