在C#中使用RabbitMQ
官方提供了 6 中应用场景使用教程: https://www.rabbitmq.com/getstarted.html
使用消息队列的好处:
- 业务系统往往要求相应能力特别强,起到削峰填谷的作用。
- 解耦和高可用。如果一个系统挂了,不会影响到其他系统的运行。
- 业务系统往往有对消息的高可靠要求,以及有对复杂功能(如ACK)的要求。
- 增强业务系统的异步处理能力,减少甚至几乎不可能出现并发现象。
该案例使用 RabbitMQ.Client 包
- 基本用法
<appSettings> <add key="AppID" value="150107"/> <add key="RabbitMQUri" value="amqp://test:123456@localhost:5672" /> </appSettings>
/// <summary> /// 发送消息 /// </summary> public class Send { private static readonly string appID = ConfigurationManager.AppSettings["AppID"]; static void Main(string[] args) { var factory = new ConnectionFactory { Uri = ConfigurationManager.AppSettings["RabbitMQUri"] }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { string queue = string.Format("MQ{0}.BaseStudy", appID); channel.QueueDeclare(queue, false, false, false, null); //定义一个队列 while (true) { Console.Write("请输入要发送的消息:"); var message = Console.ReadLine(); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish("", queue, null, body); //发送消息 Console.WriteLine("已发送的消息: {0}", message); } } } } }
/// <summary> /// 接收消息 /// </summary> public class Receive { private static readonly string appID = ConfigurationManager.AppSettings["AppID"]; static void Main(string[] args) { var factory = new ConnectionFactory { Uri = ConfigurationManager.AppSettings["RabbitMQUri"] }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { string queue = string.Format("MQ{0}.BaseStudy", appID); channel.QueueDeclare(queue, false, false, false, null); //定义一个队列 Console.WriteLine("准备接收消息:"); var consumer = new EventingBasicConsumer(channel); consumer.Received += (s, e) => { var body = e.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine("接收到的消息: {0}", message); }; channel.BasicConsume(queue, true, consumer); //开启消费者与通道、队列关联 Console.ReadLine(); } } } }
- 主题发布订阅
/// <summary> /// 发送消息,采用主题模式 /// </summary> public class Send { private static readonly string appID = ConfigurationManager.AppSettings["AppID"]; static void Main(string[] args) { var factory = new ConnectionFactory { Uri = ConfigurationManager.AppSettings["RabbitMQUri"] }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { string exchange = string.Format("Ex{0}.Logs", appID); channel.ExchangeDeclare(exchange, "topic"); //声明创建一个交换机,交换机类型设定为‘topic’ while (true) { Console.Write("请输入要发送的消息,输入格式如'RoutingKey_Message':"); var keyWithMsg = Console.ReadLine(); args = keyWithMsg.Split('_'); var routingKey = args.Length > 1 ? args[0] : "*.rabbit"; var message = args.Length > 1 ? args[1] : "Hello World"; var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish(exchange, routingKey, null, body); //发布消息 Console.WriteLine("已发送的消息: '{0}':'{1}'", routingKey, message); } } } } }
/// <summary> /// 接收消息,采用主题模式 /// </summary> public class Receive { private static readonly string appID = ConfigurationManager.AppSettings["AppID"]; static void Main(string[] args) { var factory = new ConnectionFactory { Uri = ConfigurationManager.AppSettings["RabbitMQUri"] }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { string exchange = string.Format("Ex{0}.Logs", appID); channel.ExchangeDeclare(exchange, "topic"); //声明创建一个交换机,交换机类型设定为‘topic’ var queueName = channel.QueueDeclare().QueueName; //获取连接通道所使用的队列 Console.Write("请输入准备监听的消息主题格式,格式如'*.rabbit'或者'info.*'或者'info.warning.error'等:"); while (true) { var bindingKey = Console.ReadLine(); channel.QueueBind(queueName, exchange, bindingKey); //队列绑定到交换机 Console.WriteLine("准备接收消息"); var consumer = new EventingBasicConsumer(channel); consumer.Received += (s, e) => { var routingKey = e.RoutingKey; var body = e.Body; var message = Encoding.UTF8.GetString(body); Console.WriteLine("接收到的消息: '{0}':'{1}'", routingKey, message); }; channel.BasicConsume(queueName, true, consumer); //开启消费者与通道、队列关联 } } } } }
高级特性
/// <summary> /// 发送消息 /// </summary> public class Send { private static readonly string appID = ConfigurationManager.AppSettings["AppID"]; static void Main(string[] args) { var factory = new ConnectionFactory { Uri = ConfigurationManager.AppSettings["RabbitMQUri"] }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { string queue = string.Format("MQ{0}.TaskQueue", appID); channel.QueueDeclare(queue, true, false, false, null); //定义一个支持持久化的消息队列 var properties = channel.CreateBasicProperties(); properties.DeliveryMode = 2; //1表示不持久,2表示持久化 Console.WriteLine("请注意:演示耗时较长的消息时,可通过发送带有‘.’的内容去模拟,每个‘.’加1秒!"); while (true) { Console.Write("请输入要发送的消息:"); var message = Console.ReadLine(); var body = Encoding.UTF8.GetBytes(message); channel.BasicPublish("", queue, properties, body); //发送消息 Console.WriteLine("已发送的消息: {0}", message); } } } } }
/// <summary> /// 接收消息 /// </summary> public class Receive { private static readonly string appID = ConfigurationManager.AppSettings["AppID"]; static void Main(string[] args) { var factory = new ConnectionFactory { Uri = ConfigurationManager.AppSettings["RabbitMQUri"] }; using (var connection = factory.CreateConnection()) { using (var channel = connection.CreateModel()) { string queue = string.Format("MQ{0}.TaskQueue", appID); channel.QueueDeclare(queue, true, false, false, null); //定义一个支持持久化的消息队列 channel.BasicQos(0, 1, false); //在一个消费者还在处理消息且没响应消息之前,不要给他分发新的消息,而是将这条新的消息发送给下一个不那么忙碌的消费者 Console.WriteLine("准备接收消息:"); var consumer = new EventingBasicConsumer(channel); consumer.Received += (s, e) => { var message = Encoding.UTF8.GetString(e.Body); SimulationTask(message); channel.BasicAck(e.DeliveryTag, false); //手动Ack:用来确认消息已经被消费完成了 }; channel.BasicConsume(queue, false, consumer); //开启消费者与通道、队列关联 //channel.BasicConsume(queue, true, consumer); //开启消费者与通道、队列关联;自动Ack Console.ReadLine(); } } } /// <summary> /// 模拟消息任务的处理过程 /// </summary> /// <param name="message">消息</param> private static void SimulationTask(string message) { Console.WriteLine("接收的消息: {0}", message); int dots = message.Split('.').Length - 1; Thread.Sleep(dots * 1000); Console.WriteLine("接收的消息处理完成,现在时间为{0}!", DateTime.Now); } }