@Component public class StreamProducer { //通过StreamBridge发送消息 @Autowired private StreamBridge streamBridge;
public static String CLUSTER MESSAGE OUTPUT = "cluster-out-0";//不依赖具体消息中间件实现 public void sendEvent(Event event){ Message<Event> message = new GenericMessage<>(event); streamBridge,send(CLUSTER MESSAGE OUTPUT, message); } }
public final class StreamBridge implements SmartInitializingSingleton { public boolean send(String bindingName, @Nullable String binderName, object data, MimeTypeoutputContentType){ if(!(data instanceof Message)){ data =MessageBuilder.withPayload(data).build(); }
@Component public class StreamDelayProducer { @Autowired private StreamBridge streamBridge; public static String CLUSTER_MESSAGE_OUTPUT ="cluster-out-0";
public void sendEvent(Event event){ Map<String,Object> headers = new HashMap<>(); // MessageConst.PROPERTY_DELAY_TIME_LEVEL RocketMQ组件 headers.put(MessageConst.PROPERTY_DELAY_TIME_LEVEL, 4); Message<Event> message = new GenericMessage<>(event, headers); streamBridge.send(CLUSTER_MESSAGE_OUTPUT, message); } }