前一阵子写SEDA异步框架的时候,使用了rabbitmq作为分布式消息队列(见前文),把一些学习官网教程的笔记简单备忘。示例都来自官网
Part 2 Work Queues
1、round-robin dispatchering
缺陷:存在不能准确负载均衡的弊端
2、fair dispatch --> 针对管道
采用channel.basicQos(prefetchCount); 缺陷:可能带来队列塞
3、message acknowledge --> 针对消费者
boolean autoAck = false;channel.basicConsume("hello", autoAck, consumer);//默认是true,别用反了
while (true) {
QueueingConsumer.Delivery delivery = consumer.nextDelivery();
//...
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);// 记得手动加上发送ack。不然server会认为client没收到
}
4、Message durability --> 针对队列
采用boolean durable = true;channel.queueDeclare("hello", durable, false, false, null);
5、persistent --> 针对发布者
import com.rabbitmq.client.MessageProperties;
channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
example:
public class NewTask { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws java.io.IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); String message = getMessage(argv); channel.basicPublish( "", TASK_QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } //... }
public class Worker { private static final String TASK_QUEUE_NAME = "task_queue"; public static void main(String[] argv) throws java.io.IOException, java.lang.InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); channel.basicQos(1); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(TASK_QUEUE_NAME, false, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); doWork(message); System.out.println(" [x] Done" ); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } } //... }
Part 3 Publish/Subscribe
1、Exchanges(类似block)
type:direct, topic, headers, fanout(类似广播)
Declared way: channel.exchangeDeclare("logs", "fanout");
2、Temporary queue
Non-durable, exclusive, autodelete
String queueName = channel.queueDeclare().getQueue();
3、Bindings
channel.queueBind(queueName, "logs", "");
example:
public class EmitLog { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws java.io.IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); channel.close(); connection.close(); } //... } public class ReceiveLogs { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] argv) throws java.io.IOException, java.lang.InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); } } }
Part 4 Routing
1、Direct exchange
type:direct, topic, headers, fanout(类似广播)
Declared way: channel.exchangeDeclare("logs", "fanout");
2、Multiple bindings
example:
public class EmitLogDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws java.io.IOException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String severity = getSeverity(argv); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes()); System.out.println(" [x] Sent '" + severity + "':'" + message + "'"); channel.close(); connection.close(); } //.. } public class ReceiveLogsDirect { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] argv) throws java.io.IOException, java.lang.InterruptedException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "direct"); String queueName = channel.queueDeclare().getQueue(); if (argv.length < 1){ System.err.println("Usage: ReceiveLogsDirect [info] [warning] [error]"); System.exit(1); } for(String severity : argv){ channel.queueBind(queueName, EXCHANGE_NAME, severity); } System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); String routingKey = delivery.getEnvelope().getRoutingKey(); System.out.println(" [x] Received '" + routingKey + "':'" + message + "'"); } } }
1、Topic exchange
比direct exchange方式好的地方是有通配符的概念在里面
* (star) can substitute for exactly one word.
# (hash) can substitute for zero or more words.
Delimited by dots
Routing_key up to the limit of 255 bytes
example:
public class EmitLogTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String routingKey = getRouting(argv); String message = getMessage(argv); channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes()); System.out.println(" [x] Sent '" + routingKey + "':'" + message + "'"); connection.close(); } //... } public class ReceiveLogsTopic { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "topic"); String queueName = channel.queueDeclare().getQueue(); if (argv.length < 1){ System.err.println("Usage: ReceiveLogsTopic [binding_key]..."); System.exit(1); } for(String bindingKey : argv){ channel.queueBind(queueName, EXCHANGE_NAME, bindingKey); } System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); QueueingConsumer consumer = new QueueingConsumer(channel); channel.basicConsume(queueName, true, consumer); while (true) { QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); String routingKey = delivery.getEnvelope().getRoutingKey(); System.out.println(" [x] Received '" + routingKey + "':'" + message + "'"); } } }
集群方案:
一般采用镜像队列,内存节点作为主服务器+磁盘节点做冗余。
相关推荐
rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ学习笔记rabbitMQ...
rabbitmq学习笔记和软件和插件,尚硅谷的学习笔记和软件
rabbitMQ 学习笔记
rabbitmq学习笔记.zip
RabbitMq学习笔记1
RabbitMq是实现AMQP(高级消息队列协议)的消息中间件的一种,与Springboot整合后使用方便简单,功能强大。本文还列举了Springboot整合RabbitMq使用和原版对比、及一些坑,简单易懂。
RabbitMQ学习笔记
RabbitMQ学习笔记:Connections、Channels、Queues之state状态连接、信道、队列状态如下:GitHub地址:https://gi
我自己的学习笔记,包括python、php、.net的实现,服务的搭建;具体原理的等,80多页,非常细致,学完保证对rabbitmq门儿清!我们公司的rabbit服务就是我搭建的
RabbitMQ学习笔记 RabbitMQ学习笔记 RabbitMQ学习笔记 RabbitMQ学习笔记 RabbitMQ学习笔记 RabbitMQ学习笔记
The default behaviour for RabbitMQ when a maximum queue length or size is set an
RabbitMQ的学习笔记,记录了消息中间件的相关知识点和RabbitMQ基础操作
结合狂神RabbitMQ笔记,外加自己查阅资料
rabbitMQ学习的笔记和学习过程中写的代码
RabbitMQ学习讲义,可以为您讲解基础的AMQP协议。以及用Java语言如何操作RabbitMQ。
初步学习rabbitmq,对其下载配置,学习使用进行一个粗粒度的记载
适合快速了解rabbitmq 的小白,大佬绕过,就是简单地总结一下自己不会的,一些常见的rabbitmq 比较容易遗漏的学习笔记,供大家学习,让自己方便记忆。
内容概要:在尚硅谷rabbitmq学习的过程中,根据课程原有资料,整理了自己的md格式的笔记,包括了原教程全部代码的实现(均已经跑通),相对原教程提供的资料有一些补充,同时有一些纠错,以及一些课外的引申、学习...
《RabbitMQ实战-高效部署分布式消息队列》学习的总结,测试api程序主要使用python语言
学习RabbitMQ的学习笔记