我看了关于Spring + Activiti https://m.youtube.com/watch?v=0PV_8Lew3vg的视频。我需要让流程立即移动到Activiti调用网关的serviceTask,向队列(rabbitMQ)发送请求消息。请求消息发送后,进程停止。一旦响应消息在响应队列中,serviceTask就会再次启动。serviceTask可能需要很长时间。
我尝试了webinar中的例子,它工作得很好,但是同步。
这是我的ActivitiDemoApplication.java
package com.example;
import java.util.Map;
import javax.sql.DataSource;
import org.activiti.engine.ProcessEngine;
import org.activiti.engine.RuntimeService;
import org.activiti.engine.TaskService;
import org.activiti.engine.impl.pvm.delegate.ActivityExecution;
import org.activiti.engine.runtime.ProcessInstance;
import org.activiti.spring.integration.ActivitiInboundGateway;
import org.activiti.spring.integration.IntegrationActivityBehavior;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.boot.autoconfigure.jdbc.DataSourceBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.MediaType;
import org.springframework.integration.activiti.gateway.AsyncActivityBehaviorMessagingGateway;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.support.GenericHandler;
import org.springframework.integration.support.MessageBuilder;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;
@Configuration
@ComponentScan
// @EnableAutoConfiguration
@SpringBootApplication
public class ActivitiDemoApplication {
public static void main(String[] args) {
SpringApplication.run(ActivitiDemoApplication.class, args);
}
@Bean
IntegrationActivityBehavior activitiDelegate(
ActivitiInboundGateway activitiInboundGateway) {
return new IntegrationActivityBehavior(activitiInboundGateway);
}
@Bean
ActivitiInboundGateway inboundGateway(ProcessEngine processEngine) {
return new ActivitiInboundGateway(processEngine, "processed");
}
@Bean
IntegrationFlow inboundProcess(
ActivitiInboundGateway activitiInboundGateway,
PhotoService photoService) {
return IntegrationFlows.from(activitiInboundGateway)
.handle(new GenericHandler<ActivityExecution>() {
@Override
public Object handle(ActivityExecution execution,
Map<String, Object> headers) {
try {
photoService.Execute();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return MessageBuilder.withPayload(execution)
.setHeader("processed", (Object) true)
.copyHeaders(headers).build();
}
}).get();
}
@Bean
public DataSource database() {
return DataSourceBuilder
.create()
.url("jdbc:sqlserver://localhost:1433;databaseName=activiti")
.username("activiti")
.password("activiti")
.driverClassName("com.microsoft.sqlserver.jdbc.SQLServerDriver")
.build();
}
@RestController
public static class MyRestController {
@Autowired
private RuntimeService runtimeService;
@RequestMapping(value = "/start-my-process", method = RequestMethod.GET, produces = MediaType.APPLICATION_JSON_VALUE)
public void startMyProcess() {
ProcessInstance p = runtimeService
.startProcessInstanceByKey("TestProcess");
System.out.println("id: " + p.getId());
System.out.println("count: "
+ runtimeService.createProcessInstanceQuery().count());
}
}
}
@Service
@Transactional
class PhotoService {
@Autowired
private TaskService taskService;
public void Execute() throws InterruptedException {
System.out.println("debug 1");
Thread.currentThread().sleep(2000);
System.out.println("debug 2");
}
}这是我的TestProcess.bmpn20.xml
<?xml version='1.0' encoding='UTF-8'?>
<definitions xmlns="http://www.omg.org/spec/BPMN/20100524/MODEL"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:xsd="http://www.w3.org/2001/XMLSchema"
xmlns:activiti="http://activiti.org/bpmn" xmlns:bpmndi="http://www.omg.org/spec/BPMN/20100524/DI"
xmlns:omgdc="http://www.omg.org/spec/DD/20100524/DC" xmlns:omgdi="http://www.omg.org/spec/DD/20100524/DI"
typeLanguage="http://www.w3.org/2001/XMLSchema" expressionLanguage="http://www.w3.org/1999/XPath"
targetNamespace="http://www.activiti.org/processdef" xmlns:modeler="http://activiti.com/modeler"
modeler:version="1.0ev" modeler:exportDateTime="20151228174550"
modeler:modelId="969411" modeler:modelVersion="1"
modeler:modelLastUpdated="1451324745996">
<process id="TestProcess" name="TestProcess" isExecutable="true">
<startEvent id="startEvent1" />
<intermediateCatchEvent id="sid-0BAE26E1-BB48-49FD-91E4-246B85C6B8B1">
<timerEventDefinition>
<timeDuration>PT20S</timeDuration>
</timerEventDefinition>
</intermediateCatchEvent>
<endEvent id="sid-23D49CE8-B018-4ABF-871F-07F42508C98A" />
<sequenceFlow id="sid-53B3780A-A1F6-4059-9660-D56CECD236F1"
sourceRef="sid-0BAE26E1-BB48-49FD-91E4-246B85C6B8B1" targetRef="sid-23D49CE8-B018-4ABF-871F-07F42508C98A" />
<serviceTask id="task-integration" name="task integration"
activiti:delegateExpression="#{activitiDelegate}" />
<sequenceFlow id="sid-4DDAAA87-536F-4E69-BDC8-09B96DB7BD07"
sourceRef="task-integration" targetRef="sid-0BAE26E1-BB48-49FD-91E4-246B85C6B8B1" />
<sequenceFlow id="sid-3F3E9366-BAF9-40AF-9E34-E324A91291F7"
sourceRef="startEvent1" targetRef="task-integration" />
</process>
<bpmndi:BPMNDiagram id="BPMNDiagram_TestProcess">
<bpmndi:BPMNPlane bpmnElement="TestProcess" id="BPMNPlane_TestProcess">
<bpmndi:BPMNShape bpmnElement="startEvent1"
id="BPMNShape_startEvent1">
<omgdc:Bounds height="30.0" width="30.0" x="100.0" y="163.0" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape bpmnElement="sid-0BAE26E1-BB48-49FD-91E4-246B85C6B8B1"
id="BPMNShape_sid-0BAE26E1-BB48-49FD-91E4-246B85C6B8B1">
<omgdc:Bounds height="31.0" width="31.0" x="480.0" y="162.5" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape bpmnElement="sid-23D49CE8-B018-4ABF-871F-07F42508C98A"
id="BPMNShape_sid-23D49CE8-B018-4ABF-871F-07F42508C98A">
<omgdc:Bounds height="28.0" width="28.0" x="570.0" y="161.0" />
</bpmndi:BPMNShape>
<bpmndi:BPMNShape bpmnElement="task-integration"
id="BPMNShape_task-integration">
<omgdc:Bounds height="80.0" width="100.0" x="340.0" y="135.0" />
</bpmndi:BPMNShape>
<bpmndi:BPMNEdge bpmnElement="sid-3F3E9366-BAF9-40AF-9E34-E324A91291F7"
id="BPMNEdge_sid-3F3E9366-BAF9-40AF-9E34-E324A91291F7">
<omgdi:waypoint x="130.0" y="178.0" />
<omgdi:waypoint x="162.5" y="178.0" />
<omgdi:waypoint x="157.0" y="178.0" />
<omgdi:waypoint x="340.0" y="161.521327014218" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge bpmnElement="sid-4DDAAA87-536F-4E69-BDC8-09B96DB7BD07"
id="BPMNEdge_sid-4DDAAA87-536F-4E69-BDC8-09B96DB7BD07">
<omgdi:waypoint x="440.0" y="176.4218009478673" />
<omgdi:waypoint x="480.0062629076814" y="177.5594197983227" />
</bpmndi:BPMNEdge>
<bpmndi:BPMNEdge bpmnElement="sid-53B3780A-A1F6-4059-9660-D56CECD236F1"
id="BPMNEdge_sid-53B3780A-A1F6-4059-9660-D56CECD236F1">
<omgdi:waypoint x="512.0" y="178.5" />
<omgdi:waypoint x="540.5" y="178.5" />
<omgdi:waypoint x="540.5" y="175.0" />
<omgdi:waypoint x="570.0" y="175.0" />
</bpmndi:BPMNEdge>
</bpmndi:BPMNPlane>
</bpmndi:BPMNDiagram>
</definitions>我想要进行异步进程。
<serviceTask id="task-integration" name="task integration" activiti:delegateExpression="#{activitiDelegate}"/>当Activiti调用我的网关"#{activitiDelegate}“时,我想要将请求消息移动到RequestQueue以执行"photoService.Execute()”。然后,当收到来自ResponseQueue的响应消息时,流程再次启动。
return MessageBuilder.withPayload(execution).setHeader("processed", (Object) true).copyHeaders(headers).build();我不知道如何使用网关实现amqp
@Bean
IntegrationActivityBehavior activitiDelegate(
ActivitiInboundGateway activitiInboundGateway) {
return new IntegrationActivityBehavior(activitiInboundGateway);
}
@Bean
ActivitiInboundGateway inboundGateway(ProcessEngine processEngine) {
return new ActivitiInboundGateway(processEngine, "processed");
}
@Bean
IntegrationFlow inboundProcess(
ActivitiInboundGateway activitiInboundGateway,
PhotoService photoService) {
return IntegrationFlows.from(activitiInboundGateway)
.handle(new GenericHandler<ActivityExecution>() {
@Override
public Object handle(ActivityExecution execution,
Map<String, Object> headers) {
try {
photoService.Execute();
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
return MessageBuilder.withPayload(execution)
.setHeader("processed", (Object) true)
.copyHeaders(headers).build();
}
}).get();
}我很抱歉,但我在java spring集成方面做得很好。我希望我的解释是清楚的。
发布于 2015-12-29 00:57:42
对不起,您的问题不清楚,特别是对于那些不熟悉所提到的框架之一的人。
您可以说serviceTask,但不要在代码中显示它。我只能猜测您谈论的是photoService,它无论如何都不会在下面的IntegrationFlow中调用。这也让人更加困惑。
我只能说ActivitiInboundGateway是阻塞的组件(参见ActivitiInboundGateway.execute()):
Message<?> reply = sendAndReceiveMessage(mb.build());这完全是在阻止request/reply行为。我们可以从你的IntegrationFlow定义中看到它的正确调用。
请分享更多信息以了解此案例。
更新
我想使异步进程。
看起来你可以扩展IntegrationActivityBehavior
public class AsyncIntegrationActivityBehavior extends IntegrationActivityBehavior {
private TaskExecutor executor = new ThreadPoolTaskExecutor();
@Override
public void execute(final ActivityExecution execution) throws Exception {
this.executor(new Runnable() {
public void void run() {
AsyncIntegrationActivityBehavior.super.execute(execution);
}
});
}
}并将其用于您的activitiDelegate @Bean。
https://stackoverflow.com/questions/34484335
复制相似问题