CAP结合RabbitMQ分布式事务快速开始
CAP结合RabbitMQ分布式事务快速开始
前言
上节介绍了CAP结合内存,这节结合RabbitMQ并集合数据库持久化数据。
CAP是一个在分布式系统(SOA)或微服务系统(MicroService)中实现事件总线及最终一致性(分布式事务)的一个开源的C#库,具有轻量级,高性能,易使用等特点。
CAP 具有Event Bus的所有功能,简化EventBus中的发布/订阅
CAP 具有消息持久化的功能,服务进行重启或者宕机不比担心消息丢失保证可靠性
Cap支持事务,通过捕获数据库上下文连接对象实现消息事务,消息持久化
可以简单理解:
使用起来非常简单,主要通过这个类来实现
l 发布:ICapPublisher
l 订阅:CapSubscribe
环境
l Win10
l VS2022
l .NET5.0
l DotNetCore.CAP 5.0.1
l DotNetCore.CAP.RabbitMQ 5.0.1
l DotNetCore.CAP.SqlServer 5.0.1
l CAP.Dashboard 5.0.0
l Microsoft.EntityFrameworkCore.Design 5.0.0
l SQLserver2012
项目实践
实现功能是通过一个接口发布消息到另外一个接口,并向数据库中插入数据,通过CAP看板和RabbitMQ看板来查看变化。
新建项目
新建四个项目
DB项目:“Yak.Cap.RabbitMQ.DB”
模型项目:“Yak.Cap.RabbitMQ.Models”
发布接口项目:“Yak.Cap.RabbitMQ.PublisherApi”
订阅接口项目“Yak.Cap.RabbitMQ.SubscribeApi”
DB项目
DB项目用于访问数据库,在发布消息时保存用户数据到数据库,这里使用SQLserver2012。
添加后所有的依赖有:
<ItemGroup> <PackageReference Include="Microsoft.EntityFrameworkCore" Version="5.0.0" /> <PackageReference Include="Microsoft.EntityFrameworkCore.Design" Version="5.0.0"> </PackageReference> <PackageReference Include="Microsoft.EntityFrameworkCore.SqlServer" Version="5.0.0" /> <PackageReference Include="Microsoft.EntityFrameworkCore.Tools" Version="5.0.0"> </PackageReference> </ItemGroup> |
添加数据库上下文
public class CapRabbitMQDbContext : DbContext { public CapRabbitMQDbContext(DbContextOptions<CapRabbitMQDbContext> options) : base(options) { } /// <summary> /// 重写父类的方法 用于连接数据库 /// </summary> /// <param name="optionsBuilder"></param> protected override void OnConfiguring(DbContextOptionsBuilder optionsBuilder) { if (!optionsBuilder.IsConfigured) { optionsBuilder.UseSqlServer("Data Source=.;database=CapDb;uid=sa;pwd=sa123456"); } } public DbSet<Sys_User> Users { get; set; } } |
模型项目
提供模型
[Table("Sys_User")] public class Sys_User { public int Id { get; set; } /// <summary> /// 用户名 /// </summary> public string Name { get; set; } /// <summary> /// 手机号码 /// </summary> public string C_Mobile { get; set; } } |
发布消息接口项目
添加后所有的依赖有:
<ItemGroup> <PackageReference Include="DotNetCore.CAP.Dashboard" Version="5.0.0" /> <PackageReference Include="Microsoft.EntityFrameworkCore.Design" Version="5.0.0" /> <PackageReference Include="DotNetCore.CAP" Version="5.0.1" /> <PackageReference Include="DotNetCore.CAP.RabbitMQ" Version="5.0.1" /> <PackageReference Include="DotNetCore.CAP.SqlServer" Version="5.0.1" /> </ItemGroup> |
在 Startup.cs 中,添加以下配置:
public void ConfigureServices(IServiceCollection services) {
services.AddControllers(); //添加数据库上下文服务 services.AddDbContext<CapRabbitMQDbContext>(); //添加事件总线cap services.AddCap(x => { // 使用内存存储消息(消息发送失败处理) //x.UseInMemoryStorage();
x.UseEntityFramework<CapRabbitMQDbContext>();
//使用RabbitMQ进行事件中心处理 x.UseRabbitMQ(rb => { rb.HostName = "localhost"; rb.UserName = "guest"; rb.Password = "guest"; rb.Port = 5672; rb.VirtualHost = "/"; }); //启用仪表盘 x.UseDashboard(); }); } |
添加Publish控制器,发送时间消息:
public class PublishController : Controller { private readonly ICapPublisher _capBus;
public PublishController(ICapPublisher capPublisher) { _capBus = capPublisher; } [Route("~/ef/transaction")] public IActionResult EntityFrameworkWithTransaction([FromServices] CapRabbitMQDbContext dbContext) { using (var trans = dbContext.Database.BeginTransaction(_capBus, autoCommit: true)) { //业务代码 dbContext.Add<Sys_User>(new Sys_User {Name = "yak", C_Mobile = "18221546985" }); dbContext.SaveChanges(); _capBus.Publish("test.show.time", DateTime.Now); } return Ok(); } } |
订阅接口项目
添加后所有的依赖有:
<ItemGroup> <PackageReference Include="DotNetCore.CAP.Dashboard" Version="5.0.0" /> <PackageReference Include="Microsoft.EntityFrameworkCore.Design" Version="5.0.0" /> <PackageReference Include="DotNetCore.CAP" Version="5.0.1" /> <PackageReference Include="DotNetCore.CAP.RabbitMQ" Version="5.0.1" /> <PackageReference Include="DotNetCore.CAP.SqlServer" Version="5.0.1" /> </ItemGroup> |
在 Startup.cs 中,添加以下配置:
public void ConfigureServices(IServiceCollection services) {
services.AddControllers(); //添加数据库上下文服务 services.AddDbContext<CapRabbitMQDbContext>(); //添加事件总线cap services.AddCap(x => { // 使用内存存储消息(消息发送失败处理) //x.UseInMemoryStorage();
x.UseEntityFramework<CapRabbitMQDbContext>();
//使用RabbitMQ进行事件中心处理 x.UseRabbitMQ(rb => { rb.HostName = "localhost"; rb.UserName = "guest"; rb.Password = "guest"; rb.Port = 5672; rb.VirtualHost = "/"; }); //启用仪表盘 x.UseDashboard(); }); } |
添加Publish控制器,发送时间消息:
public class ConsumerController : Controller { [NonAction] [CapSubscribe("test.show.time")] public void ReceiveMessage(DateTime time) { Console.WriteLine("message time is:" + time); }
} |
调试
启动两个接口项目,访问http://localhost:5000/ef/transaction接口后
订阅接口接受到时间:
访问http://localhost:5000/cap,打开CAP面板。
访问http://localhost:15672/#/,打开RabbitMQ面板。
数据库中新建了用户数据
总结
通过此实践了解了CAP结合RabbitMQ和数据库SQLServer进行消息发送订阅及持久化的功能(此实际仅仅保存了用户),服务进行重启或者宕机不比担心消息丢失保证可靠性。
CAP的实现要更加的严谨、更加强大,我们不需要建过程表,也不需要处理消息队列的问题,底层很多的细节都不需要我们考虑,只管用就好了。
CAP数据库存储支持:Sql Server,MySql,PostgreSql,MongoDB。
消息队列支持:RabbitMQ,Kafka,Azure Service Bus等。
CAP同时支持使用 EntityFrameworkCore 和 ADO.NET 的项目,你可以根据需要选择不同的配置方式。
鸣谢
https://cap.dotnetcore.xyz/user-guide/zh/storage/sqlserver/
https://www.cnblogs.com/liyouming/p/9150961.html
https://www.cnblogs.com/savorboard/p/cap.html
源码
https://github.com/yandaniugithub/CAP
最后,欢迎各位大佬们打赏、点赞和评论指正
版权所有,转载请注明出处:
https://www.cnblogs.com/yakniu/p/16204192.html