今天将会介绍如果使用rabbitmq进行简单的消息入队,出队操作,因为本文演示的环境要用到上文中配置的环境,所以要运行本文sample,请先按上一篇中完成相应环境配置。
首先,我们下载官方的.net客户端软件,链接: 下载并安装之后,将安装目录下的这两个DLL文件复制到我们示例项目中,并添加引用: RabbitMQ.Client.dll //基于的发布订阅消息的功能类 RabbitMQ.ServiceModel.dll //包括基于WCF方式发布订阅服务模型类
如下图: 接着,我们创建两个类,一个是ProducerMQ.cs(用于产生消息),一个是CustmerMq.cs(用于消费消息),代码如下: 首先是ProducerMQ:
public class ProducerMQ { public static void InitProducerMQ() { Uri uri = new Uri( " amqp://10.0.4.85:5672/ " ); string exchange = " ex1 " ; string exchangeType = " direct " ; string routingKey = " m1 " ; bool persistMode = true ; ConnectionFactory cf = new ConnectionFactory(); cf.UserName = " daizhj " ; cf.Password = " 617595 " ; cf.VirtualHost = " dnt_mq " ; cf.RequestedHeartbeat = 0 ; cf.Endpoint = new AmqpTcpEndpoint(uri); using (IConnection conn = cf.CreateConnection()) { using (IModel ch = conn.CreateModel()) { if (exchangeType != null ) { ch.ExchangeDeclare(exchange, exchangeType); // ,true,true,false,false, true,null); ch.QueueDeclare( " q1 " , true ); // true, true, true, false, false, null); ch.QueueBind( " q1 " , " ex1 " , " m1 " , false , null ); } IMapMessageBuilder b = new MapMessageBuilder(ch); IDictionary target = b.Headers; target[ " header " ] = " hello world " ; IDictionary targetBody = b.Body; targetBody[ " body " ] = " daizhj " ; if (persistMode) { ((IBasicProperties)b.GetContentHeader()).DeliveryMode = 2 ; } ch.BasicPublish(exchange, routingKey, (IBasicProperties)b.GetContentHeader(), b.GetContentBody()); } } } }
下面对上面代码进行说明:
1. 定义要链接的rabbitmq-server地址(基于amqp协议): Uri uri = new Uri("amqp://10.0.4.85:5672/");
2. 定义交换方式
string exchange = " ex1 " ; string exchangeType = " direct " ; string routingKey = " m1 " ;
说明:rabbitmq交换方式分为三种,分别是: Direct Exchange – 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog。 Fanout Exchange – 不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。 Topic Exchange – 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs”。 更多内容参见: 3. 是否对消息队列持久化保存
bool persistMode = true ;
4. 使用ConnectionFactory创建连接,虽然创建时指定了多个server address,但每个connection只与一个物理的server进行连接。
ConnectionFactory cf = new ConnectionFactory(); // 使用前文的配置环境信息 cf.UserName = " daizhj " ; cf.Password = " 617595 " ; cf.VirtualHost = " dnt_mq " ; cf.RequestedHeartbeat = 0 ; cf.Endpoint = new AmqpTcpEndpoint(uri);
5. 实例化IConnection对象,并设置交换方式
using (IConnection conn = cf.CreateConnection()) { using (IModel ch = conn.CreateModel()) { if (exchangeType != null ) { ch.ExchangeDeclare(exchange, exchangeType); // ,true,true,false,false, true,null); ch.QueueDeclare( " q1 " , true ); // true, true, true, false, false, null); ch.QueueBind( " q1 " , " ex1 " , " m1 " , false , null ); } ....
6. 构造消息实体对象并发布到消息队列上:
IMapMessageBuilder b = new MapMessageBuilder(ch); IDictionary target = b.Headers; target[ " header " ] = " hello world " ; IDictionary targetBody = b.Body; targetBody[ " body " ] = " daizhj " ; if (persistMode) { ((IBasicProperties)b.GetContentHeader()).DeliveryMode = 2 ; } // 简单发布方式 ch.BasicPublish(exchange, routingKey, (IBasicProperties)b.GetContentHeader(), b.GetContentBody());
这样就完成了单条消息的发布。 下面是CustmerMq.cs(用于消费消息)实例代码:
public class CustmerMq { public static int InitCustmerMq() { string exchange = " ex1 " ; string exchangeType = " direct " ; string routingKey = " m1 " ; string serverAddress = " 10.0.4.85:5672 " ; ConnectionFactory cf = new ConnectionFactory(); cf.Address = serverAddress; cf.UserName = " daizhj " ; cf.Password = " 617595 " ; cf.VirtualHost = " dnt_mq " ; cf.RequestedHeartbeat = 0 ;
可以看出上面的代码与 ProducerMQ的开头代码类似,下面使用ConnectionFactory来构造链接并接收队列消息:
using (IConnection conn = cf.CreateConnection()) { using (IModel ch = conn.CreateModel()) { // 普通使用方式BasicGet // noAck = true,不需要回复,接收到消息后,queue上的消息就会清除 // noAck = false,需要回复,接收到消息后,queue上的消息不会被清除,直到调用channel.basicAck(deliveryTag, false); queue上的消息才会被清除 而且,在当前连接断开以前,其它客户端将不能收到此queue上的消息 BasicGetResult res = ch.BasicGet( " q1 " , false /* noAck */ ); if (res != null ) { bool t = res.Redelivered; t = true ; Console.WriteLine(System.Text.UTF8Encoding.UTF8.GetString(res.Body)); ch.BasicAck(res.DeliveryTag, false ); } else { Console.WriteLine( " No message! " ); }
上面代码比较简单,主要是使用BasicGetResult来进行简单的消息接收,并使用BasicAck方式来告之是否从队列中移除该条消息。这一点很重要,因为在某些应用场景下,比如从队列中获取消息并用它来操作数据库或日志文件时,如果出现操作失败时,则该条消息应该保留在队列中,只到操作成功时才从队列中移除。
当然上面操作只是用于单条数据操作,如果要遍历队列中所有消息,则需要使用如下方式: while ( true ) { BasicGetResult res = ch.BasicGet( " q1 " , false /* noAck */ ); if (res != null ) { try { bool t = res.Redelivered; t = true ; Console.WriteLine(System.Text.UTF8Encoding.UTF8.GetString(res.Body)); ch.BasicAck(res.DeliveryTag, false ); } catch { } } else break ; }
另外,在rabbitmq中,获取消息可以使用两种方式,一种是上面提到的主动获取,另一种是基于订阅模式,即让当前获取消息的线程阻塞,用于绑定到指定的队列上,当有新的消息入队之后,该阻塞线程会被运行,从队列中获取新入队的消息,形如:
// 第二种取法QueueingBasicConsumer基于订阅模式 QueueingBasicConsumer consumer = new QueueingBasicConsumer(ch); ch.BasicConsume( " q1 " , false , null , consumer); while ( true ) { try { BasicDeliverEventArgs e = (BasicDeliverEventArgs)consumer.Queue.Dequeue(); IBasicProperties props = e.BasicProperties; byte [] body = e.Body; Console.WriteLine(System.Text.Encoding.UTF8.GetString(body)); // ch.BasicAck(e.DeliveryTag, true); ProcessRemainMessage(); } catch (EndOfStreamException ex) { // The consumer was removed, either through channel or connection closure, or through the action of IModel.BasicCancel(). Console.WriteLine(ex.ToString()); break ; } }
这样,就完成了一个简单的发布,消费消息的示例。在接下来的文章中,将会介绍如果基于WCF来发布RABBITMQ服务,敬请关注:)