enode
enode copied to clipboard
DDD 框架
æ¡æ¶ç®ä»
enode
æ¯åºäºJVM
å¹³å°ï¼éç¨Domain Driven Design
ææ³è½å°çä¸ä¸ªOpen Source
åºç¨æ¡æ¶ï¼ä¸»è¦æå¡äºäºåçåå¾®æå¡åºæ¯ã
åçè®°å½
CHANGELOG
æ´ä½æ¶æ
åºäºãDDD
ããCQRS
ããES
ããEDA
ããIn-Memory
ãæ¶æé£æ ¼ï¼å®ç°äºCQRS
æ¶æé¢ä¸´ç大é¨åææ¯é®é¢ï¼è®©å¼åè
å¯ä»¥ä¸æ³¨äºä¸å¡é»è¾åä¸å¡æµç¨çå¼åï¼èæ éå
³å¿çº¯ææ¯é®é¢ã
使ç¨çº¦æ
- ä¸ä¸ªå½ä»¤ä¸æ¬¡åªè½ä¿®æ¹ä¸ä¸ªèåæ ¹
- èåé´åªè½éè¿é¢åæ¶æ¯äº¤äº
- èåå 强ä¸è´æ§
- èåé´æç»ä¸è´æ§
Saga
ç两ç§æ¨¡å¼
- ç¼æï¼
Choreography
ï¼ åä¸è ï¼åäºå¡ï¼ä¹é´çè°ç¨ãåé ãå³çåæåºï¼éè¿äº¤æ¢äºä»¶è¿è¡è¿è¡ãæ¯ä¸ç§å»ä¸å¿åç模å¼ï¼åä¸è ä¹é´éè¿æ¶æ¯æºå¶è¿è¡æ²éï¼éè¿çå¬å¨çæ¹å¼çå¬å ¶ä»åä¸è ååºçæ¶æ¯ï¼ä»èæ§è¡åç»çé»è¾å¤çã
enode
ä¸ä½¿ç¨çå°±æ¯è¿ç§æ¨¡å¼
- æ§å¶ï¼
Orchestration
ï¼ æä¾ä¸ä¸ªæ§å¶ç±»ï¼æ¹ä¾¿åä¸è ä¹é´çåè°å·¥ä½ãäºå¡æ§è¡çå½ä»¤ä»æ§å¶ç±»åèµ·ï¼æç §é»è¾é¡ºåºè¯·æ±Saga
çåä¸è ï¼ä»åä¸è é£éæ¥åå°åé¦ä»¥åï¼æ§å¶ç±»å¨åèµ·åå ¶ä»åä¸è çè°ç¨ãææSaga
çåä¸è é½å´ç»è¿ä¸ªæ§å¶ç±»è¿è¡æ²éååè°å·¥ä½ã
Apache ServiceComb
使ç¨çæ¯è¿ç§æ¨¡å¼
æ¡æ¶ç¹è²
- å®ç°
CQRS
æ¶æï¼è§£å³CQRS
æ¶æçC
端çé«å¹¶ååçé®é¢ï¼ä»¥åCQ
两端æ°æ®åæ¥ç顺åºæ§ä¿è¯åå¹çæ§ï¼æ¯æC
端å®æåç«å³è¿åCommand
çç»æï¼ä¹æ¯æCQ
两端é½å®æåæè¿åCommand
çç»æ - èåæ ¹å¸¸é©»å
åï¼
In-Memory Domain Model
ï¼ï¼è®¾è®¡ä¸å°½å¯è½çé¿å äºèåæ ¹é建ï¼å¯ä»¥å®å ¨ä»¥OO
çæ¹å¼æ¥è®¾è®¡å®ç°èåæ ¹ï¼ä¸å¿ 为ORM
çé»æ失衡èç¦æ¼ - åºäºèåæ ¹
ID
+ äºä»¶çæ¬å·çå¯ä¸ç´¢å¼ï¼å®ç°èåæ ¹çä¹è§å¹¶åæ§å¶ - éè¿èåæ ¹
ID
对å½ä»¤æäºä»¶è¿è¡è·¯ç±ï¼èåæ ¹çå¤çåºäºActor
ææ³ï¼åå°æå°ç并åå²çªãæ大ç并è¡å¤çï¼Group Commit Domain event
- æ¶æå±é¢ä¸¥æ ¼è§èäºå¼å人å该å¦ä½å代ç ï¼å
DDD
å¼åç´§å¯ç»åï¼ä¸¥æ ¼éµå®èåå 强ä¸è´æ§ãèåä¹é´æç»ä¸è´æ§çåå - å
è¿ç
Saga
æºå¶ï¼ä»¥äºä»¶é©±å¨çæµç¨ç®¡çå¨ï¼Process Manager
ï¼çæ¹å¼æ¯æä¸ä¸ªç¨æ·æä½è·¨å¤ä¸ªèåæ ¹çä¸å¡åºæ¯ï¼å¦è®¢åå¤çï¼ä»èé¿å åå¸å¼äºå¡çä½¿ç¨ - åºäº
ES
ï¼Event Sourcing
ï¼çææ³æä¹ åC
端çèåæ ¹çç¶æï¼è®©C
端çæ°æ®æä¹ ååå¾éç¨åï¼å ·æä¸åES
çä¼ç¹ - å¨è®¾è®¡ä¸å®å ¨ä¸IoC容å¨è§£è¦ï¼åæ¶ä¿çäºæ©å±æ§ï¼ç®åéé äºSpringBoot
- éè¿åºäºåå¸å¼æ¶æ¯éå横åæ©å±çæ¹å¼å®ç°ç³»ç»çå¯ä¼¸ç¼©æ§ï¼åºäºéåçå¨ææ©å®¹/缩容ï¼ï¼æ¥å£æ½è±¡æç®ï¼åªè¦æ±æåºç¡çéåè½åï¼ç®åéé
äº
Kafka
ãRocketMQï¼ONSï¼
ãPulsar
- EventStoreå
ç½®éé
äº
JDBC
ãMySQL
ãPostgreSQL
ãMongoDB
åå¨ï¼å¯é对æ§å®ç°å¯¹åºæ©å± - æ¡æ¶å®å
¨éç¨ååºå¼ç¼ç¨ç念ï¼å¨
db
å±é¢ä½¿ç¨äºå¼æ¥é©±å¨ï¼åæ¶éæäºkotlin coroutine
æä½³å®è·µ
- å¯åèsamples模åä¸çä¾å
ç®ååºäºenodeå¼åçé¡¹ç® conference
详ç»ä»ç»
æ ¸å¿ææ³
ä¸ç®¡æ¯DDD
ä¹å¥½ï¼CQRS
æ¶æä¹å¥½ï¼è½ç¶é½åå°äºè®©é¢å对象ä¸ä»
æç¶æï¼èä¸æè¡ä¸ºï¼ä½è¿ä¸å¤å½»åºãå 为对象çè¡ä¸ºæ»æ¯â被è°ç¨âçãå 为贫è¡æ¨¡åçæ
åµä¸ï¼å¯¹è±¡æ¯æä¾äºæ°æ®è®©å«äººå»æä½æè
说被å«äººä½¿ç¨ï¼èå
è¡æ¨¡åçæ
åµä¸ï¼å¯¹è±¡åæ¯æä¾äºæ°æ®åè¡ä¸ºï¼ä½è¿æ¯è®©å«äººå»æä½æè
说被å«äººä½¿ç¨ã
çæ£çé¢å对象ç¼ç¨ä¸ç对象åºè¯¥æ¯ä¸ä¸ªâæ´»âçå ·æ主è§è½å¨æ§çåå¨äºå åä¸ç客è§åå¨ï¼å®ä»¬ä¸ä» æç¶æèä¸è¿æèªä¸»è¡ä¸ºã
- 对象çç¶æå¯ä»¥è¡¨ç°åºæ¥è¢«å«äººçå°ï¼ä½æ¯å¿ é¡»æ¯åªè¯»çï¼æ²¡æ人å¯ä»¥ç´æ¥å»ä¿®æ¹ä¸ä¸ªå¯¹è±¡çç¶æï¼å®çç¶æå¿ é¡»æ¯ç±å®èªå·±çè¡ä¸ºå¯¼è´èªå·±çç¶æçæ¹åã
- 对象çè¡ä¸ºå°±æ¯å¯¹è±¡æå ·æçæç§åè½ã对象çè¡ä¸ºæ¬è´¨ä¸åºè¯¥æ¯å¯¹æ个æ¶æ¯ç主å¨ååºï¼è¿é强è°çæ¯ä¸»å¨ï¼å°±æ¯è¯´å¯¹è±¡çè¡ä¸ºä¸å¯ä»¥è¢«å«äººä½¿ç¨ï¼èåªè½èªå·±ä¸»å¨çå»è¡¨ç°åºè¯¥è¡ä¸ºã
使ç¨è¯´æ
enode
å¨ä½¿ç¨ä¾¿å©æ§äºåäºå¾å¤å°è¯ååªåï¼èä¸é对æ¶æ¯éååEventStore
çå®ç°å¯¹å¼åè
é½æ¯å¼æ¾çï¼åæ¶åSpring
é«åº¦éæï¼å¼ç®±å³ç¨ã
å¯å¨é ç½®
æ°å¢@EnableEnode
注解ï¼å¯èªå¨é
ç½®Bean
ï¼ç®åäºæ¥å
¥æ¹å¼ã
enode
å¯å¨é
ç½®
@SpringBootApplication
@EnableEnode(value = "org.enodeframework.tests")
@ComponentScan(value = "org.enodeframework.tests")
public class App {
public static void main(String[] args) {
SpringApplication.run(App.class, args);
}
}
Spring Boot
å¯å¨é
ç½®æ件
å¦æéè¦ä½¿ç¨RokcetMQ
åONS
çtag
åè½ï¼ç¸åºçé
ç½®spring.enode.mq.tag.*
å±æ§å³å¯ï¼
# enode eventstore (memory, mysql, tidb, pg, mongo)
spring.enode.eventstore=mongo
# enode messagequeue (kafka, pulsar, rocketmq, ons)
spring.enode.mq=kafka
spring.enode.mq.topic.command=EnodeBankCommandTopic
spring.enode.mq.topic.event=EnodeBankEventTopic
kafka bean
é
ç½®
å¦ææçæè åæ¶è´¹è é ç½®å¨ä¸ä¸ªconfigæ件ä¸ï¼è¿éä¼äº§çåå¨ä¸ä¸ªå¾ªç¯ä¾èµï¼ä¸ºäºé¿å è¿ç§æ åµï¼å»ºè®®åå¼ä¸¤ä¸ªæ件é ç½®
producer
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Constants.KAFKA_SERVER);
props.put(ProducerConfig.RETRIES_CONFIG, 1);
props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 1024000);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(props);
}
@Bean
public KafkaTemplate<String, String> kafkaTemplate(ProducerFactory<String, String> producerFactory) {
return new KafkaTemplate<>(producerFactory);
}
consumer
@Value("${spring.enode.mq.topic.command}")
private String commandTopic;
@Value("${spring.enode.mq.topic.event}")
private String eventTopic;
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Constants.KAFKA_SERVER);
props.put(ConsumerConfig.GROUP_ID_CONFIG, Constants.DEFAULT_PRODUCER_GROUP);
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public KafkaMessageListenerContainer<String, String> commandListenerContainer(KafkaMessageListener commandListener, ConsumerFactory<String, String> consumerFactory) {
ContainerProperties properties = new ContainerProperties(commandTopic);
properties.setGroupId(Constants.DEFAULT_CONSUMER_GROUP);
properties.setMessageListener(commandListener);
properties.setMissingTopicsFatal(false);
return new KafkaMessageListenerContainer<>(consumerFactory, properties);
}
@Bean
public KafkaMessageListenerContainer<String, String> domainEventListenerContainer(KafkaMessageListener domainEventListener, ConsumerFactory<String, String> consumerFactory) {
ContainerProperties properties = new ContainerProperties(eventTopic);
properties.setGroupId(Constants.DEFAULT_PRODUCER_GROUP);
properties.setMessageListener(domainEventListener);
properties.setMissingTopicsFatal(false);
properties.setAckMode(ContainerProperties.AckMode.MANUAL);
return new KafkaMessageListenerContainer<>(consumerFactory, properties);
}
eventstore
æ°æ®æºé
ç½®ï¼ç®åæ¯æ(MySQL
MongoDB
PostgreSQL
...ï¼
public class DbConfig {
@Bean("enodeMongoClient")
@ConditionalOnProperty(prefix = "spring.enode", name = "eventstore", havingValue = "mongo")
public MongoClient mongoClient(Vertx vertx) {
return MongoClient.create(vertx, new JsonObject().put("db_name", "test"));
}
@Bean("enodeMySQLPool")
@ConditionalOnProperty(prefix = "spring.enode", name = "eventstore", havingValue = "mysql")
public MySQLPool enodeMySQLPool() {
MySQLConnectOptions connectOptions = MySQLConnectOptions.fromUri(jdbcUrl.replaceAll("jdbc:", ""))
.setUser(username)
.setPassword(password);
PoolOptions poolOptions = new PoolOptions()
.setMaxSize(5);
return MySQLPool.pool(connectOptions, poolOptions);
}
@Bean("enodePgPool")
@ConditionalOnProperty(prefix = "spring.enode", name = "eventstore", havingValue = "pg")
public PgPool pgPool() {
PgConnectOptions connectOptions = PgConnectOptions.fromUri(pgJdbcUrl.replaceAll("jdbc:", ""))
.setUser(pgUsername)
.setPassword(pgPassword);
PoolOptions poolOptions = new PoolOptions()
.setMaxSize(5);
return PgPool.pool(connectOptions, poolOptions);
}
@Bean("enodeMySQLDataSource")
@ConditionalOnProperty(prefix = "spring.enode", name = "eventstore", havingValue = "jdbc-mysql")
public DataSource enodeMySQLDataSource() {
HikariDataSource dataSource = new HikariDataSource();
dataSource.setJdbcUrl(jdbcUrl);
dataSource.setUsername(username);
dataSource.setPassword(password);
dataSource.setDriverClassName(com.mysql.cj.jdbc.Driver.class.getName());
return dataSource;
}
@Bean("enodePgDataSource")
@ConditionalOnProperty(prefix = "spring.enode", name = "eventstore", havingValue = "jdbc-pg")
public DataSource enodePgDataSource() {
HikariDataSource dataSource = new HikariDataSource();
dataSource.setJdbcUrl(pgJdbcUrl);
dataSource.setUsername(pgUsername);
dataSource.setPassword(pgPassword);
dataSource.setDriverClassName(org.postgresql.Driver.class.getName());
return dataSource;
}
}
äºä»¶è¡¨æ°å»º
表çå«ä¹
event_stream
表ä¸åå¨çæ¯æ¯ä¸ªèåæ ¹å对åºçæ¬çé¢åäºä»¶åå²è®°å½
published_version
表ä¸åå¨çæ¯ä¸ªèåæ ¹å½åçæ¶è´¹è¿åº¦ï¼çæ¬ï¼
注ææ两个å¯ä¸ç´¢å¼ï¼è¿ä¸ªæ¯å®ç°å¹çç常ç¨æè·¯ï¼å 为æ们认为大é¨åæ åµä¸ä¸ä¼åºç°éå¤åé®é¢
MySQL
& TiDB
CREATE TABLE event_stream (
id BIGINT AUTO_INCREMENT NOT NULL,
aggregate_root_type_name VARCHAR(256) NOT NULL,
aggregate_root_id VARCHAR(36) NOT NULL,
version INT NOT NULL,
command_id VARCHAR(36) NOT NULL,
gmt_create DATETIME NOT NULL,
events MEDIUMTEXT NOT NULL,
PRIMARY KEY (id),
UNIQUE KEY uk_aggregate_root_id_version (aggregate_root_id, version),
UNIQUE KEY uk_aggregate_root_id_command_id (aggregate_root_id, command_id)
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;
CREATE TABLE published_version (
id BIGINT AUTO_INCREMENT NOT NULL,
processor_name VARCHAR(128) NOT NULL,
aggregate_root_type_name VARCHAR(256) NOT NULL,
aggregate_root_id VARCHAR(36) NOT NULL,
version INT NOT NULL,
gmt_create DATETIME NOT NULL,
PRIMARY KEY (id),
UNIQUE KEY uk_processor_name_aggregate_root_id (processor_name, aggregate_root_id)
) ENGINE = InnoDB DEFAULT CHARSET = utf8mb4;
postgresql
CREATE TABLE event_stream (
id bigserial,
aggregate_root_type_name varchar(256),
aggregate_root_id varchar(36),
version integer,
command_id varchar(36),
gmt_create date,
events text,
PRIMARY KEY (id),
CONSTRAINT uk_aggregate_root_id_version UNIQUE (aggregate_root_id, version),
CONSTRAINT uk_aggregate_root_id_command_id UNIQUE (aggregate_root_id, command_id)
);
CREATE TABLE published_version (
id bigserial,
processor_name varchar(128),
aggregate_root_type_name varchar(256),
aggregate_root_id varchar(36),
version integer,
gmt_create date,
PRIMARY KEY (id),
CONSTRAINT uk_processor_name_aggregate_root_id UNIQUE (processor_name, aggregate_root_id)
);
MongoDB
db.event_stream.createIndex({aggregateRootId:1,commandId:1},{unique:true})
db.event_stream.createIndex({aggregateRootId:1,version:1},{unique:true})
db.published_version.createIndex({processorName:1,aggregateRootId:1},{unique:true})
ç¼ç¨æ¨¡å
æ°å¢äºä¸ä¸ªæ³¨è§£ï¼ç³»ç»éå®äºåªæ«æ@Command
å@Event
æ è¯çç±»ï¼æ§è¡çæ¹æ³ä¸éè¦æ·»å @Subscribe
注解ï¼
-
@Command
-
@Event
-
@Subscribe
å¯å¨æ¶ä¼æ«æå
è·¯å¾ä¸ç注解ï¼æ³¨åæSpring Bean
ï¼å@Component
ä½ç¨ç¸åã
ä»ä¹æ¶åéåäºä»¶é©±å¨ï¼ä»ä¹æ¶å使ç¨è¿ç¨å¼ç¼ç¨ï¼å½ä»¤åäºä»¶çåºå«ï¼ä¸¤è é½æ¯æ¶æ¯ï¼ä¸ºä»ä¹è¦åå¼è¡¨ç¤ºå¢ï¼
å½ä»¤å¯ä»¥è¢«æç»ã
äºä»¶å·²ç»åçã
è¿å¯è½æ¯æéè¦çåå ãå¨äºä»¶é©±å¨çä½ç³»ç»æä¸ï¼æ¯«æ çé®ï¼å¼åçäºä»¶ä»£è¡¨äºå·²åççäºæ
ã
ç°å¨ï¼å 为å½ä»¤æ¯æ们æ³è¦åççäºæ
ï¼å¹¶ä¸äºä»¶å·²ç»åçäºï¼æ以å½æ们å½åè¿äºäºæ
æ¶ï¼æ们åºè¯¥ä½¿ç¨ä¸åçè¯ï¼å½ä»¤ä¸è¬æ¯åè¯ï¼äºä»¶ä¸è¬æ¯è¿å»åè¯
举个ä¾åï¼æ¿è®¢åç³»ç»æ¥è¯´ï¼æ们æ个å¤é¨æ¯ä»ç³»ç»çä¾èµã
å½ç¨æ·å¨æ¯ä»ç³»ç»å®ææ¯ä»åï¼æ¯ä»ç³»ç»ä¼å订åç³»ç»åéä¸ä¸ªCommand
ï¼MarkOrderAsPayed
ï¼æ 记订åå·²æ¯ä»ï¼ï¼è®¢åå¨å¤çè¿ä¸ªCommand
æ¶ï¼è·åå½å订åï¼è°ç¨è®¢åçæ è®°å·²æ¯ä»ï¼è¡ä¸ºï¼ï¼äº§çäºOrderPayed
ï¼è®¢åå·²æ¯ä»ï¼äºä»¶ã
æ们å¯ä»¥çå°ï¼å½ä»¤é常ç±ç³»ç»å¤è°ç¨ï¼äºä»¶æ¯ç±å¤çç¨åºåç³»ç»ä¸çå
¶ä»ä»£ç æä¾çã
è¿æ¯ä»ä»¬åå¼è¡¨ç¤ºçå¦ä¸ä¸ªåå ãæ¦å¿µæ¸
æ°åº¦ã
å½ä»¤åäºä»¶é½æ¯æ¶æ¯ãä½å®ä»¬å®é
ä¸æ¯ç¬ç«çæ¦å¿µï¼åºè¯¥æç¡®å°å¯¹æ¦å¿µè¿è¡å»ºæ¨¡ã
è¿ä¸¤è
æç解é½æ¯ç¬¦å人类æç»´çï¼é¦å
æ¯åºäºå¤§èæ¥æ¶å°æç¥å°çæ¶æ¯ï¼Event
ï¼äº§çä¸ä¸ªæ³æ³ãæå¾ãï¼Command
ï¼ï¼ç¶åå¦ä½å®ç°è¿ä¸ªæ³æ³ï¼æèç维度æ¯è¿ç¨å¼çï¼å¨å®ç°çè¿ç¨ä¸ï¼ä¼äº§çä¸äºäºä»¶æ¶æ¯ï¼è¿ä¸ªæ¶æ¯åä¼å½±åå°å¤§èãå¦æ¤å¾ªç¯å¾å¤
æ¶æ¯
-
ç®åenodeå½æ°è°ç¨çå®ç°æ¯æ¾å¨
kotlin coroutine
ä¸æ¥æ§è¡çï¼è¿éæ¶åå°å®é æ§è¡çä»»å¡ç±»åï¼é对计ç®å¯éååIOå¯éåçä»»å¡ï¼ç®å没æåå¯å®å¶åçé ç½®ï¼åç»ççæ¬ä¼èèå ä¸ï¼ 使ç¨ä¹å¾ç®åï¼@Subscribe
æ¹æ³ä½å ä¸suspend
æ è®°å³å¯ã -
é对
Java
å¼æ¥ç¼ç¨åäºæ·±åº¦ä¼åï¼æ¯æCommandHandler
åEventHandler
ä¸å®ä¹CompletableFuture
è¿åå¼ï¼é»å¡è°ç¨å°è£ å¨åç¨ä¸ï¼é¿å 使ç¨#join() #get()
çé»å¡ä»£ç ï¼åæ¶ä¹æ¯ækotlin suspend
@Command
class ChangeNoteTitleCommandHandler {
@Subscribe
suspend fun handleAsync(context: CommandContext, command: ChangeNoteTitleCommand) {
val note = context.get(command.getAggregateRootId(), true, Note::class.java)
note.changeTitle(command.title)
}
}
@Subscribe
public CompletableFuture<BankAccount> handleAsync(CommandContext context, AddTransactionPreparationCommand command) {
CompletableFuture<BankAccount> future = context.getAsync(command.getAggregateRootId(), BankAccount.class);
future.thenAccept(bankAccount -> {
bankAccount.addTransactionPreparation(command.transactionId, command.transactionType, command.preparationType, command.amount);
});
return future;
}
åéå½ä»¤æ¶æ¯ï¼
CompletableFuture<CommandResult> future = commandService.executeAsync(createNoteCommand, CommandReturnType.EventHandled);
å½ä»¤å¤çï¼
/**
* é¶è¡è´¦æ·ç¸å
³å½ä»¤å¤ç
* CommandHandler<CreateAccountCommand>, //å¼æ·
* CommandAsyncHandler<ValidateAccountCommand>, //éªè¯è´¦æ·æ¯å¦åæ³
* CommandHandler<AddTransactionPreparationCommand>, //æ·»å é¢æä½
* CommandHandler<CommitTransactionPreparationCommand> //æ交é¢æä½
*/
@Command
public class BankAccountCommandHandler {
/**
* å¼æ·
*/
@Subscribe
public void handleAsync(CommandContext context, CreateAccountCommand command) {
context.addAsync(new BankAccount(command.getAggregateRootId(), command.owner));
}
/**
* æ·»å é¢æä½
*/
@Subscribe
public CompletableFuture<BankAccount> handleAsync(CommandContext context, AddTransactionPreparationCommand command) {
CompletableFuture<BankAccount> future = context.getAsync(command.getAggregateRootId(), BankAccount.class);
future.thenAccept(bankAccount -> {
bankAccount.addTransactionPreparation(command.transactionId, command.transactionType, command.preparationType, command.amount);
});
return future;
}
/**
* éªè¯è´¦æ·æ¯å¦åæ³
*/
@Subscribe
public void handleAsync(CommandContext context, ValidateAccountCommand command) {
ApplicationMessage applicationMessage = new AccountValidatePassedMessage(command.getAggregateRootId(), command.transactionId);
//æ¤å¤åºè¯¥ä¼è°ç¨å¤é¨æ¥å£éªè¯è´¦å·æ¯å¦åæ³ï¼è¿éä»
ä»
ç®åéè¿è´¦å·æ¯å¦ä»¥INVALIDå符串å¼å¤´æ¥å¤ææ¯å¦åæ³ï¼æ ¹æ®è´¦å·çåæ³æ§ï¼è¿åä¸åçåºç¨å±æ¶æ¯
if (command.getAggregateRootId().startsWith("INVALID")) {
applicationMessage = new AccountValidateFailedMessage(command.getAggregateRootId(), command.transactionId, "è´¦æ·ä¸åæ³.");
}
context.setApplicationMessage(applicationMessage);
}
/**
* æ交é¢æä½
*/
@Subscribe
public CompletableFuture<BankAccount> handleAsync(CommandContext context, CommitTransactionPreparationCommand command) {
CompletableFuture<BankAccount> future = context.getAsync(command.getAggregateRootId(), BankAccount.class);
future.thenAccept(bankAccount -> {
bankAccount.commitTransactionPreparation(command.transactionId);
});
return future;
}
}
é¢åäºä»¶åSagas
å¤çé»è¾ï¼
/**
* é¶è¡å款交ææµç¨ç®¡çå¨ï¼ç¨äºåè°é¶è¡å款交ææµç¨ä¸å个åä¸è
èåæ ¹ä¹é´çæ¶æ¯äº¤äº
* IMessageHandler<DepositTransactionStartedEvent>, //å款交æå·²å¼å§
* IMessageHandler<DepositTransactionPreparationCompletedEvent>, //å款交æå·²æ交
* IMessageHandler<TransactionPreparationAddedEvent>, //è´¦æ·é¢æä½å·²æ·»å
* IMessageHandler<TransactionPreparationCommittedEvent> //è´¦æ·é¢æä½å·²æ交
*/
@Event
public class DepositTransactionProcessManager {
@Resource
private CommandBus commandBus;
@Subscribe
public CompletableFuture<Boolean> handleAsync(DepositTransactionStartedEvent evnt) {
AddTransactionPreparationCommand command = new AddTransactionPreparationCommand(evnt.accountId, evnt.getAggregateRootId(), TransactionType.DEPOSIT_TRANSACTION, PreparationType.CREDIT_PREPARATION, evnt.amount);
command.setId(evnt.getId());
return commandBus.sendAsync(command);
}
@Subscribe
public CompletableFuture<Boolean> handleAsync(TransactionPreparationAddedEvent evnt) {
if (evnt.transactionPreparation.transactionType == TransactionType.DEPOSIT_TRANSACTION && evnt.transactionPreparation.preparationType == PreparationType.CREDIT_PREPARATION) {
ConfirmDepositPreparationCommand command = new ConfirmDepositPreparationCommand(evnt.transactionPreparation.transactionId);
command.setId(evnt.getId());
return commandBus.sendAsync(command);
}
return Task.completedTask;
}
@Subscribe
public CompletableFuture<Boolean> handleAsync(DepositTransactionPreparationCompletedEvent evnt) {
CommitTransactionPreparationCommand command = new CommitTransactionPreparationCommand(evnt.accountId, evnt.getAggregateRootId());
command.setId(evnt.getId());
return (commandBus.sendAsync(command));
}
@Subscribe
public CompletableFuture<Boolean> handleAsync(TransactionPreparationCommittedEvent evnt) {
if (evnt.transactionPreparation.transactionType == TransactionType.DEPOSIT_TRANSACTION && evnt.transactionPreparation.preparationType == PreparationType.CREDIT_PREPARATION) {
ConfirmDepositCommand command = new ConfirmDepositCommand(evnt.transactionPreparation.transactionId);
command.setId(evnt.getId());
return (commandBus.sendAsync(command));
}
return Task.completedTask;
}
}
MQ
é
ç½®å¯å¨
ç®åæ¯æä¸ç§
Pulsar
bin/pulsar standalone
Kafka
https://kafka.apache.org/quickstart
bin/zookeeper-server-start.sh config/zookeeper.properties
bin/kafka-server-start.sh config/server.properties
RocketMQ
https://rocketmq.apache.org/docs/quick-start/
å¯å¨RocketMQ
æå¡ï¼
nohup sh bin/mqnamesrv &
nohup sh bin/mqbroker -n 127.0.0.1:9876 &
command-web
å¯å¨
-
CQRS
æ¶æä¸çCommand
端åºç¨
主è¦ç¨æ¥æ¥æ¶
Command
ï¼å°Command
åéå°æ¶æ¯éåã
command-consumer
å¯å¨
- æ¶è´¹
Command
éåä¸çæ¶æ¯çæå¡
å°é¢åäºä»¶æ¶æ¯æä¹ åæç®æ¯
Command
æ§è¡æåï¼Command
æ§è¡çç»æå¯ä»¥éè¿åéå½ä»¤æ¶æ³¨åççå¬å¨è·åã
event-consumer
å¯å¨
- é¢åäºä»¶å¤çæå¡
äºä»¶å¯è½ä¼å¤æ¬¡æéï¼æ以éè¦æ¶è´¹ç«¯é»è¾ä¿è¯å¹çå¤çï¼è¿éæ¡æ¶æ æ³å®ææ¯æï¼éè¦å¼åè èªå·±å®ç°ã
转账çä¾å
转账çä¸å¡åºæ¯ï¼æ¶åäºä¸ä¸ªèåæ ¹ï¼
- é¶è¡å款交æè®°å½ï¼è¡¨ç¤ºä¸ç¬é¶è¡å款交æ
- é¶è¡è½¬è´¦äº¤æè®°å½ï¼è¡¨ç¤ºä¸ç¬é¶è¡å è´¦æ·ä¹é´ç转账交æ
- é¶è¡è´¦æ·èåæ ¹ï¼å°è£ é¶è¡è´¦æ·ä½é¢åå¨çæ°æ®ä¸è´æ§
æµè¯
- æ¥å
¥äº
OpenApi 3.0
ï¼æå¼swagger-ui
å³å¯ã
http://localhost:8080/swagger-ui.html
FAQ
èåæ ¹çå®ä¹
èåæ ¹éè¦å®ä¹ä¸ä¸ªæ åæé å½æ°ï¼å 为èåæ ¹åå§åæ¶ä½¿ç¨äºï¼
aggregateRootType.getDeclaredConstructor().newInstance();
为ä»ä¹éç¨å¼æ¥åä¸é¿è¿æ¥?
å 为æå¡çç°ç¶å¤§é½æ¯æå¡æä¾è
å°ï¼é常åªæå å°æºå¨ï¼èæå¡çæ¶è´¹è
å¤ï¼å¯è½æ´ä¸ªç½ç«é½å¨è®¿é®è¯¥æå¡ã
å¨æ们çè¿ä¸ªåºæ¯éé¢ï¼command-web
åªéè¦å¾å°çæºå¨å°±è½æ»¡è¶³å端大éç请æ±ï¼command-consumer
åevent-consumer
çæºå¨ç¸å¯¹è¾å¤äºã
å¦æéç¨å¸¸è§çâå请æ±åè¿æ¥âçæ¹å¼ï¼æå¡æä¾è
å¾å®¹æ就被åè·¨ï¼éè¿åä¸è¿æ¥ï¼ä¿è¯åä¸æ¶è´¹è
ä¸ä¼åæ»æä¾è
ï¼é¿è¿æ¥ï¼åå°è¿æ¥æ¡æéªè¯çï¼å¹¶ä½¿ç¨å¼æ¥IO
ï¼å¤ç¨çº¿ç¨æ± ï¼é²æ¢C10K
é®é¢ã
CommandHandler
åCommandAsyncHandler
åºå« (ç°å¨ç»ä¸æä¸ä¸ªäº)
-
CommandHandler
æ¯ä¸ºäºæä½å åä¸çèåæ ¹çï¼æ以ä¸ä¼æå¼æ¥æä½ï¼ä½åæ¥CommandHandler
çHandle
æ¹æ³ä¹è®¾è®¡ä¸ºäºhandleAsync
äºï¼ç®çæ¯ä¸ºäºå¼æ¥å°åºï¼å¦åå¼æ¥é¾è·¯ä¸æçè¯ï¼å¼æ¥å°±æ²¡ææäº -
CommandAsyncHandler
æ¯ä¸ºäºè®©å¼åè è°ç¨å¤é¨ç³»ç»çæ¥å£çï¼ä¹å°±æ¯è®¿é®å¤é¨IO
ï¼æ以ç¨äº`Async
CommandHandler
ï¼CommandAsyncHandler
è¿ä¸¤ä¸ªæ¥å£æ¯ç¨äºä¸åçä¸å¡åºæ¯ï¼CommandHandler.handleAsync
æ¹æ³æ§è¡å®æåï¼æ¡æ¶è¦ä»context
ä¸è·åå½åä¿®æ¹çèåæ ¹çé¢åäºä»¶ï¼ç¶åå»æ交ãèCommandAsyncHandler.handleAsync
æ¹æ³æ§è¡å®æåï¼ä¸ä¼æè¿ä¸ªé»è¾ï¼èæ¯çä¸ä¸handleAsync
æ¹æ³æ§è¡çå¼æ¥æ¶æ¯ç»ææ¯ä»ä¹ï¼ä¹å°±æ¯IApplicationMessage
ã ç®åå·²ç»å é¤äºCommandAsyncHandler
ï¼ç»ä¸ä½¿ç¨CommandHandler
æ¥å¤çï¼å¼æ¥ç»æä¼æ¾å¨context
ä¸ï¼éè¿è®¿é®#setResult
设置
CommandBus
sendAsync
å executeAsync
çåºå«
sendAsync
åªå
³æ³¨åéæ¶æ¯çç»æ
executeAsync
åéæ¶æ¯çåæ¶ï¼å
³æ³¨å½ä»¤çæ§è¡ç»æï¼è¿åçæ¶æºå¦ä¸ï¼
-
CommandReturnType.CommandExecuted
ï¼Command
æ§è¡å®æï¼Event
åå¸æååè¿åç»æ -
CommandReturnType.EventHandled
ï¼Event
å¤çå®æåæè¿åç»æ
event
使ç¨åªä¸ªè®¢é
è
åéå¤çç»æ
event
ç订é
è
å¯è½æå¾å¤ä¸ªï¼æ以enode
åªè¦æ±æä¸ä¸ªè®¢é
è
å¤çå®äºä»¶ååéç»æç»åéå½ä»¤ç人å³å¯ï¼éè¿defaultDomainEventMessageHandler
ä¸sendEventHandledMessage
åæ°æ¥è®¾ç½®æ¯å¦åéï¼æç»æ¥å³å®ç±åªä¸ªè®¢é
è
æ¥åéå½ä»¤å¤çç»æã
åè项ç®
- https://github.com/tangxuehua/enode
- https://github.com/coffeewar/enode-master