介绍在使用消息队列(MQ)时,生产者需要配置和传递的具体参数,分析这些参数在消息发送过程中的作用。针对不同MQ系统(如RabbitMQ、Kafka、ActiveMQ),描述常见的生产者配置参数和传递选项
chou403
/ MQ
/ c:
/ u:
/ 4 min read
mq 生产者具体参数是什么传参都有哪些
消息队列(Message Queue, MQ)生产者的具体参数和传参会因所使用的消息队列系统(如 Apache Kafka, RabbitMQ, Apache ActiveMQ 等)而有所不同。下面我将介绍几个常见消息队列系统的生产者参数和传参。
Apache Kafka 生产者参数
在 Kafka 中,生产者参数可以通过 Properties
对象来配置,这些参数控制了消息发送的行为,性能和可靠性。
常见参数
bootstrap.servers
: Kafka 集群的地址,格式为host1:port1,host2:port2,…
。key.serializer
: 键的序列化器类。value.serializer
: 值的序列化器类。acks
: 指定了生产者在收到服务器完成请求的确认之前发送的记录将被认为已发送的次数。acks=0
: 生产者不会等待服务器的确认。acks=1
: leader 写入日志后即确认。acks=all
: 所有同步副本确认。
retries
: 发送失败时重试的次数。batch.size
: 生产者将尝试批量处理消息记录的大小,以字节为单位。linger.ms
: 生产者在发送一批消息之前等待更多记录加入批次的时间。buffer.memory
: 生产者用来缓冲等待发送到服务器的消息的内存总字节数。
Kafka示例代码
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("acks", "all");
props.put("retries", 3);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
KafkaProducer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++) {
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
}
producer.close();
RabbitMQ 生产者参数
在 RabbitMQ 中,生产者使用的参数主要通过 AMQP 协议配置,并在发送消息时指定。
RabbitMQ常见参数
host
: RabbitMQ 服务器的地址。port
: RabbitMQ 服务器的端口,默认 5672。username
: 登录 RabbitMQ 的用户名。password
: 登录 RabbitMQ 的密码。virtualHost
: 虚拟主机。exchange
: 交换机名称。routingKey
: 路由键。mandatory
: 如果为 true, 当 RabbitMQ 不能根据自身的 Exchange 类型和路由键找到一个符合条件的 Queue 时,会将消息返回给生产者。immediate
: 已被弃用,不再推荐使用。
RabbitMQ示例代码
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setPort(5672);
factory.setUsername("guest");
factory.setPassword("guest");
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
String exchangeName = "my-exchange";
String routingKey = "my-routing-key";
String message = "Hello, RabbitMQ!";
channel.basicPublish(exchangeName, routingKey, null, message.getBytes("UTF-8"));
System.out.println(" [x] Sent '" + message + "'");
}
Apache ActiveMQ 生产者参数
在 ActiveMQ 中,生产者参数通过 JMS API 配置,通常在创建连接和会话时指定。
ActiveMQ常见参数
brokerURL
: ActiveMQ Broker 的地址,例如tcp://localhost:61616
。queueName
: 目标队列名称。deliveryMode
: 消息的持久性(DeliveryMode.NON_PERSISTENT
或DeliveryMode.PERSISTENT
)。priority
: 消息优先级。timeToLive
: 消息过期时间。
ActiveMQ示例代码
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
Connection connection = connectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Destination destination = session.createQueue("my-queue");
MessageProducer producer = session.createProducer(destination);
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
TextMessage message = session.createTextMessage("Hello, ActiveMQ!");
producer.send(message);
System.out.println("Sent message: " + message.getText());
producer.close();
session.close();
connection.close();
这些是常见消息队列系统生产者的一些主要参数和使用示例。具体的参数配置可能会根据实际需求进行调整和优化。