博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
c#操作rabbitmq
阅读量:5075 次
发布时间:2019-06-12

本文共 5648 字,大约阅读时间需要 18 分钟。

今天将会介绍如果使用rabbitmq进行简单的消息入队,出队操作,因为本文演示的环境要用到上文中配置的环境,所以要运行本文sample,请先按上一篇中完成相应环境配置。

   
      首先,我们下载官方的.net客户端软件,链接: 
   
      下载并安装之后,将安装目录下的这两个DLL文件复制到我们示例项目中,并添加引用:

RabbitMQ.Client.dll //基于的发布订阅消息的功能类   
RabbitMQ.ServiceModel.dll //包括基于WCF方式发布订阅服务模型类

    

       如下图:
    
   
       接着,我们创建两个类,一个是ProducerMQ.cs(用于产生消息),一个是CustmerMq.cs(用于消费消息),代码如下:
   
       首先是ProducerMQ:

public     class   ProducerMQ
{
      public     static      void   InitProducerMQ()
    {
        Uri uri   =     new   Uri(  "  amqp://10.0.4.85:5672/  "  );
          string   exchange   =     "  ex1  "  ;
          string   exchangeType   =     "  direct  "  ;
          string   routingKey   =     "  m1  "  ;
          bool   persistMode   =     true  ;
        ConnectionFactory cf   =     new   ConnectionFactory();
      
        cf.UserName   =     "  daizhj  "  ;
        cf.Password   =     "  617595  "  ;
        cf.VirtualHost   =     "  dnt_mq  "  ;
        cf.RequestedHeartbeat   =     0  ;
        cf.Endpoint   =     new   AmqpTcpEndpoint(uri);
          using   (IConnection conn   =   cf.CreateConnection())
        {
              using   (IModel ch   =   conn.CreateModel())
            {
                  if   (exchangeType   !=     null  )
                {
                    ch.ExchangeDeclare(exchange, exchangeType);  //  ,true,true,false,false, true,null); 
                    ch.QueueDeclare(  "  q1  "  ,   true  );  //  true, true, true, false, false, null); 
                    ch.QueueBind(  "  q1  "  ,   "  ex1  "  ,   "  m1  "  ,   false  ,   null  ); 
                }
                IMapMessageBuilder b   =     new   MapMessageBuilder(ch);
                IDictionary target   =   b.Headers;
                target[  "  header  "  ]   =     "  hello world  "  ;
                IDictionary targetBody   =   b.Body;
                targetBody[  "  body  "  ]   =     "  daizhj  "  ;
                  if   (persistMode)
                {
                    ((IBasicProperties)b.GetContentHeader()).DeliveryMode   =     2  ;
                }
             
                ch.BasicPublish(exchange, routingKey,
                                           (IBasicProperties)b.GetContentHeader(),
                                           b.GetContentBody());
            }
        }
    }
}

 

 

    下面对上面代码进行说明:

    1.  定义要链接的rabbitmq-server地址(基于amqp协议):

Uri uri = new Uri("amqp://10.0.4.85:5672/");

                 

    2.  定义交换方式       

string   exchange   =     "  ex1  "  ;
string   exchangeType   =     "  direct  "  ;
string   routingKey   =     "  m1  "  ;

           

        说明:rabbitmq交换方式分为三种,分别是:
        Direct Exchange – 处理路由键。需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 “dog”,则只有被标记为“dog”的消息才被转发,不会转发dog.puppy,也不会转发dog.guard,只会转发dog。 
        Fanout Exchange – 不处理路由键。你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout交换机转发消息是最快的。 
        Topic Exchange – 将路由键和某模式进行匹配。此时队列需要绑定要一个模式上。符号“#”匹配一个或多个词,符号“*”匹配不多不少一个词。因此“audit.#”能够匹配到“audit.irs.corporate”,但是“audit.*” 只会匹配到“audit.irs”。
        更多内容参见:   
        
     3. 是否对消息队列持久化保存

bool   persistMode   =     true  ;

 

     

     4. 使用ConnectionFactory创建连接,虽然创建时指定了多个server address,但每个connection只与一个物理的server进行连接。

       ConnectionFactory cf   =     new   ConnectionFactory();    
          //  使用前文的配置环境信息   
        cf.UserName   =     "  daizhj  "  ; 
        cf.Password   =     "  617595  "  ;
        cf.VirtualHost   =     "  dnt_mq  "  ;
        cf.RequestedHeartbeat   =     0  ;
        cf.Endpoint   =     new   AmqpTcpEndpoint(uri);

 

        

     5. 实例化IConnection对象,并设置交换方式       

   using   (IConnection conn   =   cf.CreateConnection())
            {
                  using   (IModel ch   =   conn.CreateModel())
                {
                      if   (exchangeType   !=     null  )
                    {
                        ch.ExchangeDeclare(exchange, exchangeType);  //  ,true,true,false,false, true,null); 
                        ch.QueueDeclare(  "  q1  "  ,   true  );  //  true, true, true, false, false, null); 
                        ch.QueueBind(  "  q1  "  ,   "  ex1  "  ,   "  m1  "  ,   false  ,   null  ); 
                    }
        ....

 

        

     6. 构造消息实体对象并发布到消息队列上:

  IMapMessageBuilder b   =     new   MapMessageBuilder(ch);
  IDictionary target   =   b.Headers;
  target[  "  header  "  ]   =     "  hello world  "  ;
  IDictionary targetBody   =   b.Body;
  targetBody[  "  body  "  ]   =     "  daizhj  "  ;
    if   (persistMode)
  {
    ((IBasicProperties)b.GetContentHeader()).DeliveryMode   =     2  ;
  }
    //  简单发布方式 
  ch.BasicPublish(exchange, routingKey,
          (IBasicProperties)b.GetContentHeader(),
          b.GetContentBody());

 

          

    这样就完成了单条消息的发布。
    
    下面是CustmerMq.cs(用于消费消息)实例代码:    

public     class   CustmerMq
    {
          public     static     int   InitCustmerMq()
        {
              string   exchange   =     "  ex1  "  ;
              string   exchangeType   =     "  direct  "  ;
              string   routingKey   =     "  m1  "  ;
              string   serverAddress   =     "  10.0.4.85:5672  "  ;
            ConnectionFactory cf   =     new   ConnectionFactory();
            cf.Address   =   serverAddress;
            cf.UserName   =     "  daizhj  "  ;
            cf.Password   =     "  617595  "  ;
            cf.VirtualHost   =     "  dnt_mq  "  ;
            cf.RequestedHeartbeat   =     0  ;

 

            

      可以看出上面的代码与 ProducerMQ的开头代码类似,下面使用ConnectionFactory来构造链接并接收队列消息:           

   using   (IConnection conn   =   cf.CreateConnection())
            {
                  using   (IModel ch   =   conn.CreateModel())
                {
                      //  普通使用方式BasicGet
                      //  noAck = true,不需要回复,接收到消息后,queue上的消息就会清除
                      //  noAck = false,需要回复,接收到消息后,queue上的消息不会被清除,直到调用channel.basicAck(deliveryTag, false); queue上的消息才会被清除 而且,在当前连接断开以前,其它客户端将不能收到此queue上的消息 
                    BasicGetResult res   =   ch.BasicGet(  "  q1  "  ,   false  /*  noAck  */  );
                      if   (res   !=     null  )
                    {
                          bool   t   =   res.Redelivered;
                        t   =     true  ;
                        Console.WriteLine(System.Text.UTF8Encoding.UTF8.GetString(res.Body));
                        ch.BasicAck(res.DeliveryTag,   false  );
                    }
                      else 
                    {
                        Console.WriteLine(  "  No message!  "  );
                    }  

 

 

      上面代码比较简单,主要是使用BasicGetResult来进行简单的消息接收,并使用BasicAck方式来告之是否从队列中移除该条消息。这一点很重要,因为在某些应用场景下,比如从队列中获取消息并用它来操作数据库或日志文件时,如果出现操作失败时,则该条消息应该保留在队列中,只到操作成功时才从队列中移除。

  
      当然上面操作只是用于单条数据操作,如果要遍历队列中所有消息,则需要使用如下方式:

while   (  true  )
  {
      BasicGetResult res   =   ch.BasicGet(  "  q1  "  ,   false  /*  noAck  */  );
        if   (res   !=     null  )
      {
            try 
          {
                 bool   t   =   res.Redelivered;
                        t   =     true  ;
                        Console.WriteLine(System.Text.UTF8Encoding.UTF8.GetString(res.Body));
                        ch.BasicAck(res.DeliveryTag,   false  );
          }
            catch   { }
      }
        else 
            break  ;
  }

              

            
     另外,在rabbitmq中,获取消息可以使用两种方式,一种是上面提到的主动获取,另一种是基于订阅模式,即让当前获取消息的线程阻塞,用于绑定到指定的队列上,当有新的消息入队之后,该阻塞线程会被运行,从队列中获取新入队的消息,形如:

   //  第二种取法QueueingBasicConsumer基于订阅模式 
 QueueingBasicConsumer consumer   =     new   QueueingBasicConsumer(ch);
 ch.BasicConsume(  "  q1  "  ,   false  ,   null  , consumer);
   while   (  true  )
 {
       try 
     {
         BasicDeliverEventArgs e   =   (BasicDeliverEventArgs)consumer.Queue.Dequeue();
         IBasicProperties props   =   e.BasicProperties;
           byte  [] body   =   e.Body;
         Console.WriteLine(System.Text.Encoding.UTF8.GetString(body));
           //  ch.BasicAck(e.DeliveryTag, true); 
         ProcessRemainMessage();                          
     }
       catch   (EndOfStreamException ex)  
     {
           // The consumer was removed, either through channel or connection closure, or through the action of IModel.BasicCancel(). 
         Console.WriteLine(ex.ToString());
           break  ;
     }
 }

 

  
     这样,就完成了一个简单的发布,消费消息的示例。在接下来的文章中,将会介绍如果基于WCF来发布RABBITMQ服务,敬请关注:)

转载于:https://www.cnblogs.com/systemnet123/p/3274558.html

你可能感兴趣的文章
使用cocoscreator + node.js + websocket实现简单的聊天服务
查看>>
什么是预测区间,置信区间与预测区间二者的异同是什么?
查看>>
asp.net (jquery easy-ui datagrid)通用Excel文件导出(NPOI)
查看>>
ubuntuPC机安装JLink驱动
查看>>
快速排序
查看>>
我的第一篇随笔
查看>>
设置Eclipse/MyEclipse中编辑界面点击任何文件后Package Explorer导航自动定位该文件...
查看>>
多阶段决策问题
查看>>
C# write in pdf file
查看>>
jQuery.Pin钉住某个元素
查看>>
[笔记][FPGA]有限状态机FSM学习笔记(三)
查看>>
Hotel Check in & check out
查看>>
数组间赋值
查看>>
nignx配置文件语法高亮
查看>>
。标识符命名规则
查看>>
JAVA课程课后作业之使用递归完成回文
查看>>
搞了一天,用SharedPreference 实现网络请求的JSON数据的本地存储
查看>>
DNS、链接网页、资源预加载处理
查看>>
Parrot Linux国内源
查看>>
在Ubuntu 14.04 64bit中永久添加DNS的方法
查看>>