enode
enode copied to clipboard
DDD 框架
æ¡æ¶ç®ä»
enodeæ¯åºäºJVMå¹³å°ï¼éç¨Domain Driven Designææ³è½å°çä¸ä¸ªOpen Sourceåºç¨æ¡æ¶ï¼ä¸»è¦æå¡äºäºåçåå¾®æå¡åºæ¯ã
åçè®°å½
CHANGELOG
æ´ä½æ¶æ
åºäºãDDDããCQRSããESããEDAããIn-Memoryãæ¶æé£æ ¼ï¼å®ç°äºCQRSæ¶æé¢ä¸´ç大é¨åææ¯é®é¢ï¼è®©å¼åè
å¯ä»¥ä¸æ³¨äºä¸å¡é»è¾åä¸å¡æµç¨çå¼åï¼èæ éå
³å¿çº¯ææ¯é®é¢ã

使ç¨çº¦æ
- ä¸ä¸ªå½ä»¤ä¸æ¬¡åªè½ä¿®æ¹ä¸ä¸ªèåæ ¹
- èåé´åªè½éè¿é¢åæ¶æ¯äº¤äº
- èåå 强ä¸è´æ§
- èåé´æç»ä¸è´æ§
Sagaçä¸¤ç§æ¨¡å¼
- ç¼æï¼
Choreographyï¼ åä¸è ï¼åäºå¡ï¼ä¹é´çè°ç¨ãåé ãå³çåæåºï¼éè¿äº¤æ¢äºä»¶è¿è¡è¿è¡ãæ¯ä¸ç§å»ä¸å¿åçæ¨¡å¼ï¼åä¸è ä¹é´éè¿æ¶æ¯æºå¶è¿è¡æ²éï¼éè¿çå¬å¨çæ¹å¼çå¬å ¶ä»åä¸è ååºçæ¶æ¯ï¼ä»èæ§è¡åç»çé»è¾å¤çã
enodeä¸ä½¿ç¨çå°±æ¯è¿ç§æ¨¡å¼
- æ§å¶ï¼
Orchestrationï¼ æä¾ä¸ä¸ªæ§å¶ç±»ï¼æ¹ä¾¿åä¸è ä¹é´çåè°å·¥ä½ãäºå¡æ§è¡çå½ä»¤ä»æ§å¶ç±»åèµ·ï¼æç §é»è¾é¡ºåºè¯·æ±Sagaçåä¸è ï¼ä»åä¸è é£éæ¥åå°åé¦ä»¥åï¼æ§å¶ç±»å¨åèµ·åå ¶ä»åä¸è çè°ç¨ãææSagaçåä¸è é½å´ç»è¿ä¸ªæ§å¶ç±»è¿è¡æ²éååè°å·¥ä½ã
Apache ServiceComb使ç¨çæ¯è¿ç§æ¨¡å¼
æ¡æ¶ç¹è²
- å®ç°
CQRSæ¶æï¼è§£å³CQRSæ¶æçC端çé«å¹¶ååçé®é¢ï¼ä»¥åCQä¸¤ç«¯æ°æ®åæ¥çé¡ºåºæ§ä¿è¯åå¹çæ§ï¼æ¯æCç«¯å®æåç«å³è¿åCommandçç»æï¼ä¹æ¯æCQ两端é½å®æåæè¿åCommandçç»æ - èåæ ¹å¸¸é©»å
åï¼
In-Memory Domain Modelï¼ï¼è®¾è®¡ä¸å°½å¯è½çé¿å äºèåæ ¹é建ï¼å¯ä»¥å®å ¨ä»¥OOçæ¹å¼æ¥è®¾è®¡å®ç°èåæ ¹ï¼ä¸å¿ 为ORMçé»æå¤±è¡¡èç¦æ¼ - åºäºèåæ ¹
ID+ äºä»¶çæ¬å·çå¯ä¸ç´¢å¼ï¼å®ç°èåæ ¹çä¹è§å¹¶åæ§å¶ - éè¿èåæ ¹
ID对å½ä»¤æäºä»¶è¿è¡è·¯ç±ï¼èåæ ¹çå¤çåºäºActorææ³ï¼åå°æå°çå¹¶åå²çªãæå¤§çå¹¶è¡å¤çï¼Group Commit Domain event - æ¶æå±é¢ä¸¥æ ¼è§èäºå¼å人å该å¦ä½å代ç ï¼å
DDDå¼åç´§å¯ç»åï¼ä¸¥æ ¼éµå®èåå 强ä¸è´æ§ãèåä¹é´æç»ä¸è´æ§çåå - å
è¿ç
Sagaæºå¶ï¼ä»¥äºä»¶é©±å¨çæµç¨ç®¡çå¨ï¼Process Managerï¼çæ¹å¼æ¯æä¸ä¸ªç¨æ·æä½è·¨å¤ä¸ªèåæ ¹çä¸å¡åºæ¯ï¼å¦è®¢åå¤çï¼ä»èé¿å åå¸å¼äºå¡çä½¿ç¨ - åºäº
ESï¼Event Sourcingï¼çææ³æä¹ åC端çèåæ ¹çç¶æï¼è®©Cç«¯çæ°æ®æä¹ ååå¾éç¨åï¼å ·æä¸åESçä¼ç¹ - å¨è®¾è®¡ä¸å®å ¨ä¸IoC容å¨è§£è¦ï¼åæ¶ä¿çäºæ©å±æ§ï¼ç®åéé äºSpringBoot
- éè¿åºäºåå¸å¼æ¶æ¯é忍ªåæ©å±çæ¹å¼å®ç°ç³»ç»çå¯ä¼¸ç¼©æ§ï¼åºäºéåç卿æ©å®¹/缩容ï¼ï¼æ¥å£æ½è±¡æç®ï¼åªè¦æ±æåºç¡çéåè½åï¼ç®åéé
äº
KafkaãRocketMQï¼ONSï¼ãPulsar - EventStoreå
ç½®éé
äº
JDBCãMySQLãPostgreSQLãMongoDBåå¨ï¼å¯é对æ§å®ç°å¯¹åºæ©å± - æ¡æ¶å®å
¨éç¨ååºå¼ç¼ç¨ç念ï¼å¨
dbå±é¢ä½¿ç¨äºå¼æ¥é©±å¨ï¼åæ¶éæäºkotlin coroutine
æä½³å®è·µ
- å¯åèsamples模åä¸çä¾å
ç®ååºäºenodeå¼åçé¡¹ç® conference
详ç»ä»ç»
æ ¸å¿ææ³
ä¸ç®¡æ¯DDDä¹å¥½ï¼CQRSæ¶æä¹å¥½ï¼è½ç¶é½åå°äºè®©é¢å对象ä¸ä»
æç¶æï¼è䏿è¡ä¸ºï¼ä½è¿ä¸å¤å½»åºãå 为对象çè¡ä¸ºæ»æ¯â被è°ç¨âçãå ä¸ºè´«è¡æ¨¡åçæ
åµä¸ï¼å¯¹è±¡æ¯æä¾äºæ°æ®è®©å«äººå»æä½æè
说被å«äººä½¿ç¨ï¼èå
è¡æ¨¡åçæ
åµä¸ï¼å¯¹è±¡åæ¯æä¾äºæ°æ®åè¡ä¸ºï¼ä½è¿æ¯è®©å«äººå»æä½æè
说被å«äººä½¿ç¨ã
çæ£çé¢å对象ç¼ç¨ä¸ç对象åºè¯¥æ¯ä¸ä¸ªâæ´»âçå ·æä¸»è§è½å¨æ§çåå¨äºå åä¸ç客è§åå¨ï¼å®ä»¬ä¸ä» æç¶æèä¸è¿æèªä¸»è¡ä¸ºã
- 对象çç¶æå¯ä»¥è¡¨ç°åºæ¥è¢«å«äººçå°ï¼ä½æ¯å¿ é¡»æ¯åªè¯»çï¼æ²¡æäººå¯ä»¥ç´æ¥å»ä¿®æ¹ä¸ä¸ªå¯¹è±¡çç¶æï¼å®çç¶æå¿ é¡»æ¯ç±å®èªå·±çè¡ä¸ºå¯¼è´èªå·±çç¶æçæ¹åã
- 对象çè¡ä¸ºå°±æ¯å¯¹è±¡æå ·æçæç§åè½ã对象çè¡ä¸ºæ¬è´¨ä¸åºè¯¥æ¯å¯¹æä¸ªæ¶æ¯ç主å¨ååºï¼è¿é强è°çæ¯ä¸»å¨ï¼å°±æ¯è¯´å¯¹è±¡çè¡ä¸ºä¸å¯ä»¥è¢«å«äººä½¿ç¨ï¼èåªè½èªå·±ä¸»å¨çå»è¡¨ç°åºè¯¥è¡ä¸ºã
使ç¨è¯´æ
enodeå¨ä½¿ç¨ä¾¿å©æ§äºåäºå¾å¤å°è¯ååªåï¼èä¸éå¯¹æ¶æ¯éååEventStoreçå®ç°å¯¹å¼åè
齿¯å¼æ¾çï¼åæ¶åSpringé«åº¦éæï¼å¼ç®±å³ç¨ã
å¯å¨é ç½®
æ°å¢@EnableEnode注解ï¼å¯èªå¨é
ç½®Beanï¼ç®åäºæ¥å
¥æ¹å¼ã
enodeå¯å¨é
ç½®
@SpringBootApplication
@EnableEnode(value = "org.enodeframework.tests")
@ComponentScan(value = "org.enodeframework.tests")
public class App {
public static void main(String[] args) {
SpringApplication.run(App.class, args);
}
}
Spring Bootå¯å¨é
ç½®æä»¶
妿éè¦ä½¿ç¨RokcetMQåONSçtagåè½ï¼ç¸åºçé
ç½®spring.enode.mq.tag.*屿§å³å¯ï¼
# enode eventstore (memory, mysql, tidb, pg, mongo)
spring.enode.eventstore=mongo
# enode messagequeue (kafka, pulsar, rocketmq, ons)
spring.enode.mq=kafka
spring.enode.mq.topic.command=EnodeBankCommandTopic
spring.enode.mq.topic.event=EnodeBankEventTopic
kafka beané
ç½®
妿æçæè åæ¶è´¹è é ç½®å¨ä¸ä¸ªconfigæä»¶ä¸ï¼è¿éä¼äº§çåå¨ä¸ä¸ªå¾ªç¯ä¾èµï¼ä¸ºäºé¿å è¿ç§æ åµï¼å»ºè®®åå¼ä¸¤ä¸ªæä»¶é ç½®
producer
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Constants.KAFKA_SERVER);
props.put(ProducerConfig.RETRIES_CONFIG, 1);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 1024000);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) {
return new KafkaTemplate<>(producerFactory);
}
consumer
@Value("${spring.enode.mq.topic.command}")
private String commandTopic;
@Value("${spring.enode.mq.topic.event}")
private String eventTopic;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Constants.KAFKA_SERVER);
props.put(ConsumerConfig.GROUP_ID_CONFIG, Constants.DEFAULT_PRODUCER_GROUP);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public KafkaMessageListenerContainer<String, String> commandListenerContainer(KafkaMessageListener commandListener, ConsumerFactory<String, String> consumerFactory) {
ContainerProperties properties = new ContainerProperties(commandTopic);
properties.setGroupId(Constants.DEFAULT_CONSUMER_GROUP);
properties.setMessageListener(commandListener);
properties.setMissingTopicsFatal(false);
return new KafkaMessageListenerContainer<>(consumerFactory, properties);
}
@Bean
public KafkaMessageListenerContainer<String, String> domainEventListenerContainer(KafkaMessageListener domainEventListener, ConsumerFactory<String, String> consumerFactory) {
ContainerProperties properties = new ContainerProperties(eventTopic);
properties.setGroupId(Constants.DEFAULT_PRODUCER_GROUP);
properties.setMessageListener(domainEventListener);
properties.setMissingTopicsFatal(false);
properties.setAckMode(ContainerProperties.AckMode.MANUAL);
return new KafkaMessageListenerContainer<>(consumerFactory, properties);
}
eventstoreæ°æ®æºé
ç½®ï¼ç®åæ¯æ(MySQL MongoDB PostgreSQL ...ï¼
public class DbConfig {
@Bean("enodeMongoClient")
@ConditionalOnProperty(prefix = "spring.enode", name = "eventstore", havingValue = "mongo")
public MongoClient mongoClient(Vertx vertx) {
return MongoClient.create(vertx, new JsonObject().put("db_name", "test"));
}
@Bean("enodeMySQLPool")
@ConditionalOnProperty(prefix = "spring.enode", name = "eventstore", havingValue = "mysql")
public MySQLPool enodeMySQLPool() {
MySQLConnectOptions connectOptions = MySQLConnectOptions.fromUri(jdbcUrl.replaceAll("jdbc:", ""))
.setUser(username)
.setPassword(password);
PoolOptions poolOptions = new PoolOptions()
.setMaxSize(5);
return MySQLPool.pool(connectOptions, poolOptions);
}
@Bean("enodePgPool")
@ConditionalOnProperty(prefix = "spring.enode", name = "eventstore", havingValue = "pg")
public PgPool pgPool() {
PgConnectOptions connectOptions = PgConnectOptions.fromUri(pgJdbcUrl.replaceAll("jdbc:", ""))
.setUser(pgUsername)
.setPassword(pgPassword);
PoolOptions poolOptions = new PoolOptions()
.setMaxSize(5);
return PgPool.pool(connectOptions, poolOptions);
}
@Bean("enodeMySQLDataSource")
@ConditionalOnProperty(prefix = "spring.enode", name = "eventstore", havingValue = "jdbc-mysql")
public DataSource enodeMySQLDataSource() {
HikariDataSource dataSource = new HikariDataSource();
dataSource.setJdbcUrl(jdbcUrl);
dataSource.setUsername(username);
dataSource.setPassword(password);
dataSource.setDriverClassName(com.mysql.cj.jdbc.Driver.class.getName());
return dataSource;
}
@Bean("enodePgDataSource")
@ConditionalOnProperty(prefix = "spring.enode", name = "eventstore", havingValue = "jdbc-pg")
public DataSource enodePgDataSource() {
HikariDataSource dataSource = new HikariDataSource();
dataSource.setJdbcUrl(pgJdbcUrl);
dataSource.setUsername(pgUsername);
dataSource.setPassword(pgPassword);
dataSource.setDriverClassName(org.postgresql.Driver.class.getName());
return dataSource;
}
}
äºä»¶è¡¨æ°å»º
表çå«ä¹
event_stream 表ä¸åå¨çæ¯æ¯ä¸ªèåæ ¹å对åºçæ¬çé¢åäºä»¶åå²è®°å½
published_version 表ä¸åå¨çæ¯ä¸ªèåæ ¹å½åçæ¶è´¹è¿åº¦ï¼çæ¬ï¼
注ææä¸¤ä¸ªå¯ä¸ç´¢å¼ï¼è¿ä¸ªæ¯å®ç°å¹ççå¸¸ç¨æè·¯ï¼å 为æä»¬è®¤ä¸ºå¤§é¨åæ åµä¸ä¸ä¼åºç°éå¤åé®é¢
MySQL & TiDB
CREATE TABLE event_stream (
id BIGINT AUTO_INCREMENT NOT NULL,
aggregate_root_type_name VARCHAR(256) NOT NULL,
aggregate_root_id VARCHAR(36) NOT NULL,
version INT NOT NULL,
command_id VARCHAR(36) NOT NULL,
gmt_create DATETIME NOT NULL,
events MEDIUMTEXT NOT NULL,
PRIMARY KEY (id),
UNIQUE KEY uk_aggregate_root_id_version (aggregate_root_id, version),
UNIQUE KEY uk_aggregate_root_id_command_id (aggregate_root_id, command_id)
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;
CREATE TABLE published_version (
id BIGINT AUTO_INCREMENT NOT NULL,
processor_name VARCHAR(128) NOT NULL,
aggregate_root_type_name VARCHAR(256) NOT NULL,
aggregate_root_id VARCHAR(36) NOT NULL,
version INT NOT NULL,
gmt_create DATETIME NOT NULL,
PRIMARY KEY (id),
UNIQUE KEY uk_processor_name_aggregate_root_id (processor_name, aggregate_root_id)
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;
postgresql
CREATE TABLE event_stream (
id bigserial,
aggregate_root_type_name varchar(256),
aggregate_root_id varchar(36),
version integer,
command_id varchar(36),
gmt_create date,
events text,
PRIMARY KEY (id),
CONSTRAINT uk_aggregate_root_id_version UNIQUE (aggregate_root_id, version),
CONSTRAINT uk_aggregate_root_id_command_id UNIQUE (aggregate_root_id, command_id)
);
CREATE TABLE published_version (
id bigserial,
processor_name varchar(128),
aggregate_root_type_name varchar(256),
aggregate_root_id varchar(36),
version integer,
gmt_create date,
PRIMARY KEY (id),
CONSTRAINT uk_processor_name_aggregate_root_id UNIQUE (processor_name, aggregate_root_id)
);
MongoDB
db.event_stream.createIndex({aggregateRootId:1,commandId:1},{unique:true})
db.event_stream.createIndex({aggregateRootId:1,version:1},{unique:true})
db.published_version.createIndex({processorName:1,aggregateRootId:1},{unique:true})
ç¼ç¨æ¨¡å
æ°å¢äºä¸ä¸ªæ³¨è§£ï¼ç³»ç»éå®äºåªæ«æ@Commandå@Eventæ è¯çç±»ï¼æ§è¡çæ¹æ³ä¸éè¦æ·»å @Subscribe注解ï¼
@Command@Event@Subscribe
å¯å¨æ¶ä¼æ«æå
è·¯å¾ä¸çæ³¨è§£ï¼æ³¨åæSpring Beanï¼å@Componentä½ç¨ç¸åã
ä»ä¹æ¶åéåäºä»¶é©±å¨ï¼ä»ä¹æ¶å使ç¨è¿ç¨å¼ç¼ç¨ï¼å½ä»¤åäºä»¶çåºå«ï¼ä¸¤è 齿¯æ¶æ¯ï¼ä¸ºä»ä¹è¦åå¼è¡¨ç¤ºå¢ï¼
å½ä»¤å¯ä»¥è¢«æç»ã
äºä»¶å·²ç»åçã
è¿å¯è½æ¯æéè¦çåå ãå¨äºä»¶é©±å¨çä½ç³»ç»æä¸ï¼æ¯«æ çé®ï¼å¼åçäºä»¶ä»£è¡¨äºå·²åççäºæ
ã
ç°å¨ï¼å 为å½ä»¤æ¯æä»¬æ³è¦åççäºæ
ï¼å¹¶ä¸äºä»¶å·²ç»åçäºï¼æä»¥å½æä»¬å½åè¿äºäºæ
æ¶ï¼æä»¬åºè¯¥ä½¿ç¨ä¸åçè¯ï¼å½ä»¤ä¸è¬æ¯åè¯ï¼äºä»¶ä¸è¬æ¯è¿å»åè¯
举个ä¾åï¼æ¿è®¢åç³»ç»æ¥è¯´ï¼æä»¬æä¸ªå¤é¨æ¯ä»ç³»ç»çä¾èµã
å½ç¨æ·å¨æ¯ä»ç³»ç»å®ææ¯ä»åï¼æ¯ä»ç³»ç»ä¼å订åç³»ç»åéä¸ä¸ªCommandï¼MarkOrderAsPayedï¼æ 记订åå·²æ¯ä»ï¼ï¼è®¢åå¨å¤çè¿ä¸ªCommandæ¶ï¼è·åå½å订åï¼è°ç¨è®¢åçæ è®°å·²æ¯ä»ï¼è¡ä¸ºï¼ï¼äº§çäºOrderPayedï¼è®¢åå·²æ¯ä»ï¼äºä»¶ã
æä»¬å¯ä»¥çå°ï¼å½ä»¤é常ç±ç³»ç»å¤è°ç¨ï¼äºä»¶æ¯ç±å¤çç¨åºåç³»ç»ä¸çå
¶ä»ä»£ç æä¾çã
è¿æ¯ä»ä»¬åå¼è¡¨ç¤ºçå¦ä¸ä¸ªåå ãæ¦å¿µæ¸
æ°åº¦ã
å½ä»¤åäºä»¶é½æ¯æ¶æ¯ãä½å®ä»¬å®é
䏿¯ç¬ç«çæ¦å¿µï¼åºè¯¥æç¡®å°å¯¹æ¦å¿µè¿è¡å»ºæ¨¡ã
è¿ä¸¤è
æçè§£é½æ¯ç¬¦å人类æç»´çï¼é¦å
æ¯åºäºå¤§èæ¥æ¶å°æç¥å°çæ¶æ¯ï¼Eventï¼äº§çä¸ä¸ªæ³æ³ãæå¾ãï¼Commandï¼ï¼ç¶åå¦ä½å®ç°è¿ä¸ªæ³æ³ï¼æèç维度æ¯è¿ç¨å¼çï¼å¨å®ç°çè¿ç¨ä¸ï¼ä¼äº§çä¸äºäºä»¶æ¶æ¯ï¼è¿ä¸ªæ¶æ¯åä¼å½±åå°å¤§èã妿¤å¾ªç¯å¾å¤
æ¶æ¯
-
ç®åenode彿°è°ç¨çå®ç°æ¯æ¾å¨
kotlin coroutine䏿¥æ§è¡çï¼è¿éæ¶åå°å®é æ§è¡çä»»å¡ç±»åï¼é对计ç®å¯éååIOå¯éåçä»»å¡ï¼ç®å没æåå¯å®å¶åçé ç½®ï¼åç»ççæ¬ä¼èèå ä¸ï¼ 使ç¨ä¹å¾ç®åï¼@Subscribeæ¹æ³ä½å ä¸suspendæ è®°å³å¯ã -
é对
Java弿¥ç¼ç¨åäºæ·±åº¦ä¼åï¼æ¯æCommandHandleråEventHandlerä¸å®ä¹CompletableFutureè¿åå¼ï¼é»å¡è°ç¨å°è£ å¨åç¨ä¸ï¼é¿å 使ç¨#join() #get()çé»å¡ä»£ç ï¼åæ¶ä¹æ¯ækotlin suspend
@Command
class ChangeNoteTitleCommandHandler {
@Subscribe
suspend fun handleAsync(context: CommandContext, command: ChangeNoteTitleCommand) {
val note = context.get(command.getAggregateRootId(), true, Note::class.java)
note.changeTitle(command.title)
}
}
@Subscribe
public CompletableFuture<BankAccount> handleAsync(CommandContext context, AddTransactionPreparationCommand command) {
CompletableFuture<BankAccount> future = context.getAsync(command.getAggregateRootId(), BankAccount.class);
future.thenAccept(bankAccount -> {
bankAccount.addTransactionPreparation(command.transactionId, command.transactionType, command.preparationType, command.amount);
});
return future;
}
åéå½ä»¤æ¶æ¯ï¼
CompletableFuture<CommandResult> future = commandService.executeAsync(createNoteCommand, CommandReturnType.EventHandled);
å½ä»¤å¤çï¼
/**
* é¶è¡è´¦æ·ç¸å
³å½ä»¤å¤ç
* CommandHandler<CreateAccountCommand>, //弿·
* CommandAsyncHandler<ValidateAccountCommand>, //éªè¯è´¦æ·æ¯å¦åæ³
* CommandHandler<AddTransactionPreparationCommand>, //æ·»å 颿ä½
* CommandHandler<CommitTransactionPreparationCommand> //æäº¤é¢æä½
*/
@Command
public class BankAccountCommandHandler {
/**
* 弿·
*/
@Subscribe
public void handleAsync(CommandContext context, CreateAccountCommand command) {
context.addAsync(new BankAccount(command.getAggregateRootId(), command.owner));
}
/**
* æ·»å 颿ä½
*/
@Subscribe
public CompletableFuture<BankAccount> handleAsync(CommandContext context, AddTransactionPreparationCommand command) {
CompletableFuture<BankAccount> future = context.getAsync(command.getAggregateRootId(), BankAccount.class);
future.thenAccept(bankAccount -> {
bankAccount.addTransactionPreparation(command.transactionId, command.transactionType, command.preparationType, command.amount);
});
return future;
}
/**
* éªè¯è´¦æ·æ¯å¦åæ³
*/
@Subscribe
public void handleAsync(CommandContext context, ValidateAccountCommand command) {
ApplicationMessage applicationMessage = new AccountValidatePassedMessage(command.getAggregateRootId(), command.transactionId);
//æ¤å¤åºè¯¥ä¼è°ç¨å¤é¨æ¥å£éªè¯è´¦å·æ¯å¦åæ³ï¼è¿éä»
ä»
ç®åéè¿è´¦å·æ¯å¦ä»¥INVALIDå符串å¼å¤´æ¥å¤ææ¯å¦åæ³ï¼æ ¹æ®è´¦å·çåæ³æ§ï¼è¿åä¸åçåºç¨å±æ¶æ¯
if (command.getAggregateRootId().startsWith("INVALID")) {
applicationMessage = new AccountValidateFailedMessage(command.getAggregateRootId(), command.transactionId, "è´¦æ·ä¸åæ³.");
}
context.setApplicationMessage(applicationMessage);
}
/**
* æäº¤é¢æä½
*/
@Subscribe
public CompletableFuture<BankAccount> handleAsync(CommandContext context, CommitTransactionPreparationCommand command) {
CompletableFuture<BankAccount> future = context.getAsync(command.getAggregateRootId(), BankAccount.class);
future.thenAccept(bankAccount -> {
bankAccount.commitTransactionPreparation(command.transactionId);
});
return future;
}
}
é¢åäºä»¶åSagaså¤çé»è¾ï¼
/**
* é¶è¡åæ¬¾äº¤ææµç¨ç®¡çå¨ï¼ç¨äºåè°é¶è¡åæ¬¾äº¤ææµç¨ä¸å个åä¸è
èåæ ¹ä¹é´çæ¶æ¯äº¤äº
* IMessageHandler<DepositTransactionStartedEvent>, //忬¾äº¤æå·²å¼å§
* IMessageHandler<DepositTransactionPreparationCompletedEvent>, //忬¾äº¤æå·²æäº¤
* IMessageHandler<TransactionPreparationAddedEvent>, //è´¦æ·é¢æä½å·²æ·»å
* IMessageHandler<TransactionPreparationCommittedEvent> //è´¦æ·é¢æä½å·²æäº¤
*/
@Event
public class DepositTransactionProcessManager {
@Resource
private CommandBus commandBus;
@Subscribe
public CompletableFuture<Boolean> handleAsync(DepositTransactionStartedEvent evnt) {
AddTransactionPreparationCommand command = new AddTransactionPreparationCommand(evnt.accountId, evnt.getAggregateRootId(), TransactionType.DEPOSIT_TRANSACTION, PreparationType.CREDIT_PREPARATION, evnt.amount);
command.setId(evnt.getId());
return commandBus.sendAsync(command);
}
@Subscribe
public CompletableFuture<Boolean> handleAsync(TransactionPreparationAddedEvent evnt) {
if (evnt.transactionPreparation.transactionType == TransactionType.DEPOSIT_TRANSACTION && evnt.transactionPreparation.preparationType == PreparationType.CREDIT_PREPARATION) {
ConfirmDepositPreparationCommand command = new ConfirmDepositPreparationCommand(evnt.transactionPreparation.transactionId);
command.setId(evnt.getId());
return commandBus.sendAsync(command);
}
return Task.completedTask;
}
@Subscribe
public CompletableFuture<Boolean> handleAsync(DepositTransactionPreparationCompletedEvent evnt) {
CommitTransactionPreparationCommand command = new CommitTransactionPreparationCommand(evnt.accountId, evnt.getAggregateRootId());
command.setId(evnt.getId());
return (commandBus.sendAsync(command));
}
@Subscribe
public CompletableFuture<Boolean> handleAsync(TransactionPreparationCommittedEvent evnt) {
if (evnt.transactionPreparation.transactionType == TransactionType.DEPOSIT_TRANSACTION && evnt.transactionPreparation.preparationType == PreparationType.CREDIT_PREPARATION) {
ConfirmDepositCommand command = new ConfirmDepositCommand(evnt.transactionPreparation.transactionId);
command.setId(evnt.getId());
return (commandBus.sendAsync(command));
}
return Task.completedTask;
}
}
MQé
ç½®å¯å¨
ç®åæ¯æä¸ç§
Pulsar
bin/pulsar standalone
Kafka
https://kafka.apache.org/quickstart
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
RocketMQ
https://rocketmq.apache.org/docs/quick-start/
å¯å¨RocketMQæå¡ï¼
nohup sh bin/mqnamesrv &
nohup sh bin/mqbroker -n 127.0.0.1:9876 &
command-webå¯å¨
CQRSæ¶æä¸çCommand端åºç¨
主è¦ç¨æ¥æ¥æ¶
Commandï¼å°Commandåéå°æ¶æ¯éåã
command-consumerå¯å¨
- æ¶è´¹
Commandéåä¸çæ¶æ¯çæå¡
å°é¢åäºä»¶æ¶æ¯æä¹ åæç®æ¯
Commandæ§è¡æåï¼Commandæ§è¡çç»æå¯ä»¥éè¿åéå½ä»¤æ¶æ³¨åççå¬å¨è·åã
event-consumerå¯å¨
- é¢åäºä»¶å¤çæå¡
äºä»¶å¯è½ä¼å¤æ¬¡æéï¼æä»¥éè¦æ¶è´¹ç«¯é»è¾ä¿è¯å¹çå¤çï¼è¿éæ¡æ¶æ æ³å®ææ¯æï¼éè¦å¼åè èªå·±å®ç°ã
转账çä¾å
转账çä¸å¡åºæ¯ï¼æ¶åäºä¸ä¸ªèåæ ¹ï¼
- é¶è¡å款交æè®°å½ï¼è¡¨ç¤ºä¸ç¬é¶è¡å款交æ
- é¶è¡è½¬è´¦äº¤æè®°å½ï¼è¡¨ç¤ºä¸ç¬é¶è¡å è´¦æ·ä¹é´ç转账交æ
- é¶è¡è´¦æ·èåæ ¹ï¼å°è£ é¶è¡è´¦æ·ä½é¢åå¨çæ°æ®ä¸è´æ§
æµè¯
- æ¥å
¥äº
OpenApi 3.0ï¼æå¼swagger-uiå³å¯ã
http://localhost:8080/swagger-ui.html
FAQ
èåæ ¹çå®ä¹
èåæ ¹éè¦å®ä¹ä¸ä¸ªæ åæé 彿°ï¼å 为èåæ ¹åå§åæ¶ä½¿ç¨äºï¼
aggregateRootType.getDeclaredConstructor().newInstance();
为ä»ä¹éç¨å¼æ¥åä¸é¿è¿æ¥?
å 为æå¡çç°ç¶å¤§é½æ¯æå¡æä¾è
å°ï¼éå¸¸åªæå å°æºå¨ï¼èæå¡çæ¶è´¹è
å¤ï¼å¯è½æ´ä¸ªç½ç«é½å¨è®¿é®è¯¥æå¡ã
卿们çè¿ä¸ªåºæ¯éé¢ï¼command-webåªéè¦å¾å°çæºå¨å°±è½æ»¡è¶³å端大éç请æ±ï¼command-consumeråevent-consumerçæºå¨ç¸å¯¹è¾å¤äºã
妿éç¨å¸¸è§çâå请æ±åè¿æ¥âçæ¹å¼ï¼æå¡æä¾è
å¾å®¹æå°±è¢«åè·¨ï¼éè¿åä¸è¿æ¥ï¼ä¿è¯å䏿¶è´¹è
ä¸ä¼åæ»æä¾è
ï¼é¿è¿æ¥ï¼åå°è¿æ¥æ¡æéªè¯çï¼å¹¶ä½¿ç¨å¼æ¥IOï¼å¤ç¨çº¿ç¨æ± ï¼é²æ¢C10Ké®é¢ã
CommandHandleråCommandAsyncHandleråºå« (ç°å¨ç»ä¸æä¸ä¸ªäº)
CommandHandleræ¯ä¸ºäºæä½å åä¸çèåæ ¹çï¼æä»¥ä¸ä¼æå¼æ¥æä½ï¼ä½åæ¥CommandHandlerçHandleæ¹æ³ä¹è®¾è®¡ä¸ºäºhandleAsyncäºï¼ç®çæ¯ä¸ºäºå¼æ¥å°åºï¼å¦å弿¥é¾è·¯ä¸æçè¯ï¼å¼æ¥å°±æ²¡ææäºCommandAsyncHandleræ¯ä¸ºäºè®©å¼åè è°ç¨å¤é¨ç³»ç»çæ¥å£çï¼ä¹å°±æ¯è®¿é®å¤é¨IOï¼æä»¥ç¨äº`Async
CommandHandlerï¼CommandAsyncHandlerè¿ä¸¤ä¸ªæ¥å£æ¯ç¨äºä¸åçä¸å¡åºæ¯ï¼CommandHandler.handleAsyncæ¹æ³æ§è¡å®æåï¼æ¡æ¶è¦ä»contextä¸è·åå½åä¿®æ¹çèåæ ¹çé¢åäºä»¶ï¼ç¶åå»æäº¤ãèCommandAsyncHandler.handleAsyncæ¹æ³æ§è¡å®æåï¼ä¸ä¼æè¿ä¸ªé»è¾ï¼èæ¯çä¸ä¸handleAsyncæ¹æ³æ§è¡ç弿¥æ¶æ¯ç»ææ¯ä»ä¹ï¼ä¹å°±æ¯IApplicationMessageã ç®åå·²ç»å é¤äºCommandAsyncHandlerï¼ç»ä¸ä½¿ç¨CommandHandleræ¥å¤çï¼å¼æ¥ç»æä¼æ¾å¨contextä¸ï¼éè¿è®¿é®#setResult设置
CommandBus sendAsync å executeAsyncçåºå«
sendAsyncåªå
³æ³¨åéæ¶æ¯çç»æ
executeAsyncåéæ¶æ¯çåæ¶ï¼å
³æ³¨å½ä»¤çæ§è¡ç»æï¼è¿åçæ¶æºå¦ä¸ï¼
CommandReturnType.CommandExecutedï¼Commandæ§è¡å®æï¼Eventå叿ååè¿åç»æCommandReturnType.EventHandledï¼Eventå¤ç宿åæè¿åç»æ
event使ç¨åªä¸ªè®¢é
è
åéå¤çç»æ
eventç订é
è
å¯è½æå¾å¤ä¸ªï¼æä»¥enodeåªè¦æ±æä¸ä¸ªè®¢é
è
å¤çå®äºä»¶ååéç»æç»åéå½ä»¤ç人å³å¯ï¼éè¿defaultDomainEventMessageHandlerä¸sendEventHandledMessageåæ°æ¥è®¾ç½®æ¯å¦åéï¼æç»æ¥å³å®ç±åªä¸ªè®¢é
è
æ¥åéå½ä»¤å¤çç»æã
åè项ç®
- https://github.com/tangxuehua/enode
- https://github.com/coffeewar/enode-master