两种安装方式
1 原生安装
-安装扩展epel源
-yum -y install erlang
-yum -y install rabbitmq-server
-systemctl start rabbitmq-server
2 docker拉取
-docker pull rabbitmq:management(自动开启了web管理界面)
-docker run -di --name rabbitmq -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 rabbitmq:management
3 5672:是rabbitmq的默认端口
15672:web管理界面的端口
4 创建用户
rabbitmqctl add_user aaa 123
5 分配权限
rabbitmqctl set_user_tags aaa administrator
rabbitmqctl set_permissions -p "/" aaa ".*" ".*" ".*"
基本使用
# 生产者
# pika
# pip3 install pika
import pika
# 拿到连接对象
# connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166'))
# 有用户名密码的情况
credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))
# 拿到channel对象
channel = connection.channel()
# 声明一个队列
channel.queue_declare(queue='hello') # 指定队列名字
# 生产者向队列中放一条消息
channel.basic_publish(exchange='',
routing_key='hello',
body='aaa js nb')
print(" Sent 'Hello World!'")
# 关闭连接
connection.close()
# 消费者
import pika, sys, os
def main():
# connection = pika.BlockingConnection(pika.ConnectionParameters(host='101.133.225.166'))
credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166', credentials=credentials))
channel = connection.channel()
channel.queue_declare(queue='hello')
def callback(ch, method, properties, body):
print(" Received %r" % body)
channel.basic_consume(queue='hello', on_message_callback=callback, auto_ack=True)
channel.start_consuming()
if __name__ == '__main__':
main()
消息确认机制
# 生产者
# pika
# pip3 install pika
import pika
# 拿到连接对象
# connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166'))
# 有用户名密码的情况
credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))
# 拿到channel对象
channel = connection.channel()
# 声明一个队列
channel.queue_declare(queue='aaa') # 指定队列名字
# 生产者向队列中放一条消息
channel.basic_publish(exchange='',
routing_key='aaa',
body='aaa jssss nb')
print(" aaa jssss nb'")
# 关闭连接
connection.close()
# 消费者
import pika, sys, os
def main():
# connection = pika.BlockingConnection(pika.ConnectionParameters(host='101.133.225.166'))
credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166', credentials=credentials))
channel = connection.channel()
channel.queue_declare(queue='aaa')
def callback(ch, method, properties, body):
print(" Received %r" % body)
# 真正的消息处理完了,再发确认
ch.basic_ack(delivery_tag=method.delivery_tag)
## 不会自动回复确认消息,
## auto_ack=True,队列收到确认,就会自动把消费过的消息删除
channel.basic_consume(queue='aaa', on_message_callback=callback, auto_ack=False)
channel.start_consuming()
if __name__ == '__main__':
main()
持久化
#在声明队列时,指定持久化
channel.queue_declare(queue='aaa_new',durable=True)
# 声明消息持久化
在发布消息的时候,
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
)
## 生产者
# pika
# pip3 install pika
import pika
# 拿到连接对象
# connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166'))
# 有用户名密码的情况
credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))
# 拿到channel对象
channel = connection.channel()
# 声明一个队列
channel.queue_declare(queue='aaa_new',durable=True) # 指定队列名字
# 生产者向队列中放一条消息
channel.basic_publish(exchange='',
routing_key='aaa_new',
body='aaa jssss nb',
properties=pika.BasicProperties(
delivery_mode=2, # make message persistent
)
)
print(" aaa jssss nb'")
# 关闭连接
connection.close()
### 消费者
import pika, sys, os
def main():
# connection = pika.BlockingConnection(pika.ConnectionParameters(host='101.133.225.166'))
credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166', credentials=credentials))
channel = connection.channel()
channel.queue_declare(queue='aaa_new',durable=True)
def callback(ch, method, properties, body):
print("Received %r" % body)
# 真正的消息处理完了,再发确认
ch.basic_ack(delivery_tag=method.delivery_tag)
## 不会自动回复确认消息,
## auto_ack=True,队列收到确认,就会自动把消费过的消息删除
channel.basic_consume(queue='aaa_new', on_message_callback=callback, auto_ack=False)
channel.start_consuming()
if __name__ == '__main__':
main()
闲置消费
#就只有这一句话 谁闲置谁获取,没必要按照顺序一个一个来
channel.basic_qos(prefetch_count=1)
## 生产者
# pika
# pip3 install pika
import pika
# 拿到连接对象
# connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166'))
# 有用户名密码的情况
credentials = pika.PlainCredentials("admin","admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166',credentials=credentials))
# 拿到channel对象
channel = connection.channel()
# 声明一个队列
channel.queue_declare(queue='aaa') # 指定队列名字
# 生产者向队列中放一条消息
channel.basic_publish(exchange='',
routing_key='aaa',
body='aaa jssss nb')
print(" aaa jssss nb'")
# 关闭连接
connection.close()
### 消费者1
import pika, sys, os
def main():
# connection = pika.BlockingConnection(pika.ConnectionParameters(host='101.133.225.166'))
credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166', credentials=credentials))
channel = connection.channel()
channel.queue_declare(queue='aaa')
def callback(ch, method, properties, body):
import time
time.sleep(50)
print(" Received %r" % body)
# 真正的消息处理完了,再发确认
ch.basic_ack(delivery_tag=method.delivery_tag)
## 不会自动回复确认消息,
## auto_ack=True,队列收到确认,就会自动把消费过的消息删除
channel.basic_qos(prefetch_count=1) #####就只有这一句话 谁闲置谁获取,没必要按照顺序一个一个来
channel.basic_consume(queue='aaa', on_message_callback=callback, auto_ack=False)
channel.start_consuming()
if __name__ == '__main__':
main()
###消费者2
import pika, sys, os
def main():
# connection = pika.BlockingConnection(pika.ConnectionParameters(host='101.133.225.166'))
credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166', credentials=credentials))
channel = connection.channel()
channel.queue_declare(queue='aaa')
def callback(ch, method, properties, body):
print("Received %r" % body)
# 真正的消息处理完了,再发确认
ch.basic_ack(delivery_tag=method.delivery_tag)
## 不会自动回复确认消息,
## auto_ack=True,队列收到确认,就会自动把消费过的消息删除
channel.basic_qos(prefetch_count=1) #####就只有这一句话 谁闲置谁获取,没必要按照顺序一个一个来
channel.basic_consume(queue='aaa', on_message_callback=callback, auto_ack=False)
channel.start_consuming()
if __name__ == '__main__':
main()
发布订阅
## 发布者
import pika
credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166', credentials=credentials))
channel = connection.channel()
# 声明队列没有指定名字,指定了exchange
channel.exchange_declare(exchange='logs', exchange_type='fanout')
message = "info: Hello World!"
channel.basic_publish(exchange='logs', routing_key='', body=message)
print(" Sent %r" % message)
connection.close()
## 订阅者(启动多次,会创建出多个队列,都绑定到了同一个exchange上)
import pika
credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166', credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='logs', exchange_type='fanout')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
print(queue_name)
channel.queue_bind(exchange='logs', queue=queue_name)
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print("%r" % body)
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
关键字
### 发布者
import pika
credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166', credentials=credentials))
channel = connection.channel()
# 声明队列没有指定名字,指定了exchange
channel.exchange_declare(exchange='aaa123', exchange_type='direct')
message = "info: asdfasdfasdfsadfasdf World!"
channel.basic_publish(exchange='aaa123', routing_key='bnb', body=message)
print(" Sent %r" % message)
connection.close()
### 订阅者1
import pika
credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166', credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='aaa123', exchange_type='direct')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
print(queue_name)
channel.queue_bind(exchange='aaa123', queue=queue_name,routing_key='nb')
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" %r" % body)
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
####订阅者2
import pika
credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166', credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='aaa123', exchange_type='direct')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
print(queue_name)
channel.queue_bind(exchange='aaa123', queue=queue_name,routing_key='nb')
channel.queue_bind(exchange='aaa123', queue=queue_name,routing_key='bnb')
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print("%r" % body)
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
模糊匹配
# 表示后面可以跟任意字符
*表示后面只能跟一个单词
###发布者
import pika
credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166', credentials=credentials))
channel = connection.channel()
# 声明队列没有指定名字,指定了exchange
channel.exchange_declare(exchange='m3', exchange_type='topic')
message = "info: asdfasdfasdfsadfasdf World!"
channel.basic_publish(exchange='m3', routing_key='aaa.dd', body=message)
print(" Sent %r" % message)
connection.close()
### 订阅者1
import pika
credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166', credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='m3', exchange_type='topic')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
print(queue_name)
channel.queue_bind(exchange='m3', queue=queue_name,routing_key='aaa.*')
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" %r" % body)
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
###订阅者2
import pika
credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166', credentials=credentials))
channel = connection.channel()
channel.exchange_declare(exchange='m3', exchange_type='topic')
result = channel.queue_declare(queue='', exclusive=True)
queue_name = result.method.queue
print(queue_name)
channel.queue_bind(exchange='m3', queue=queue_name,routing_key='aaa.#')
print(' [*] Waiting for logs. To exit press CTRL+C')
def callback(ch, method, properties, body):
print(" %r" % body)
channel.basic_consume(
queue=queue_name, on_message_callback=callback, auto_ack=True)
channel.start_consuming()
通过rabbitmq实现rpc
### 服务端
import pika
credentials = pika.PlainCredentials("admin", "admin")
connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166', credentials=credentials))
channel = connection.channel()
channel.queue_declare(queue='rpc_queue')
def fib(n):
if n == 0:
return 0
elif n == 1:
return 1
else:
return fib(n - 1) + fib(n - 2)
def on_request(ch, method, props, body):
n = int(body)
print(" [.] fib(%s)" % n)
response = fib(n)
ch.basic_publish(exchange='',
routing_key=props.reply_to,
properties=pika.BasicProperties(correlation_id = \
props.correlation_id),
body=str(response))
ch.basic_ack(delivery_tag=method.delivery_tag)
channel.basic_qos(prefetch_count=1)
channel.basic_consume(queue='rpc_queue', on_message_callback=on_request)
print(" Awaiting RPC requests")
channel.start_consuming()
## 客户端
import pika
import uuid
class FibonacciRpcClient(object):
def __init__(self):
self.credentials = pika.PlainCredentials("admin", "admin")
self.connection = pika.BlockingConnection(pika.ConnectionParameters('101.133.225.166', credentials=self.credentials))
self.channel = self.connection.channel()
result = self.channel.queue_declare(queue='', exclusive=True)
self.callback_queue = result.method.queue
self.channel.basic_consume(
queue=self.callback_queue,
on_message_callback=self.on_response,
auto_ack=True)
def on_response(self, ch, method, props, body):
if self.corr_id == props.correlation_id:
self.response = body
def call(self, n):
self.response = None
self.corr_id = str(uuid.uuid4())
self.channel.basic_publish(
exchange='',
routing_key='rpc_queue',
properties=pika.BasicProperties(
reply_to=self.callback_queue,
correlation_id=self.corr_id,
),
body=str(n))
while self.response is None:
self.connection.process_data_events()
return int(self.response)
fibonacci_rpc = FibonacciRpcClient()
print("Requesting fib(30)")
response = fibonacci_rpc.call(10) # 外界看上去,就像调用本地的call()函数一样
print(" [.] Got %r" % response)
python中的rpc框架
SimpleXMLRPCServer
### 服务端
from xmlrpc.server import SimpleXMLRPCServer
class RPCServer(object):
def __init__(self):
super(RPCServer, self).__init__()
print(self)
self.send_data = {'server:'+str(i): i for i in range(100)}
self.recv_data = None
def getObj(self):
print('get data')
return self.send_data
def sendObj(self, data):
print('send data')
self.recv_data = data
print(self.recv_data)
# SimpleXMLRPCServer
server = SimpleXMLRPCServer(('localhost',4242), allow_none=True)
server.register_introspection_functions()
server.register_instance(RPCServer())
server.serve_forever()
### 客户端
import time
from xmlrpc.client import ServerProxy
# SimpleXMLRPCServer
def xmlrpc_client():
print('xmlrpc client')
c = ServerProxy('http://localhost:4242')
data = {'client:'+str(i): i for i in range(100)}
start = time.clock()
for i in range(50):
a=c.getObj()
print(a)
for i in range(50):
c.sendObj(data)
print('xmlrpc total time %s' % (time.clock() - start))
if __name__ == '__main__':
xmlrpc_client()
ZeroRPC实现rpc
### 服务端
import zerorpc
class RPCServer(object):
def __init__(self):
super(RPCServer, self).__init__()
print(self)
self.send_data = {'server:'+str(i): i for i in range(100)}
self.recv_data = None
def getObj(self):
print('get data')
return self.send_data
def sendObj(self, data):
print('send data')
self.recv_data = data
print(self.recv_data)
# zerorpc
s = zerorpc.Server(RPCServer())
s.bind('tcp://0.0.0.0:4243')
s.run()
### 客户端
import zerorpc
import time
# zerorpc
def zerorpc_client():
print('zerorpc client')
c = zerorpc.Client()
c.connect('tcp://127.0.0.1:4243')
data = {'client:'+str(i): i for i in range(100)}
start = time.clock()
for i in range(500):
a=c.getObj()
print(a)
for i in range(500):
c.sendObj(data)
print('total time %s' % (time.clock() - start))
if __name__ == '__main__':
zerorpc_client()