Patterns¶
Flask-Coney can be used to archive different messaging patterns. A few typical patterns, found on RabbitMQ Tutorials, will be displayed here.
Hallo World¶
In this example we have two services (aka flask applications). Service 1 will receive data from a http post request. It will send this data to the message broker with the routing_key “process”. Service 2 will pick up any message with the routing key “process”, process and store the data. Service 2 also provides an api endpoint, which allows a user to request the processed data.
Service 1¶
from flask import Flask, request
from flask_coney import Coney
app = Flask(__name__)
app.config["CONEY_BROKER_URI"] = "amqp://guest:guest@rabbitmq"
coney = Coney(app)
@app.route("/process", methods=["POST"])
def process():
data = request.get_json()
# validation ...
coney.publish(data, routing_key="process")
return "will be processed, check service2"
Serivce 2¶
from flask import Flask
from flask_coney import Coney
app = Flask(__name__)
app.config["CONEY_BROKER_URI"] = "amqp://guest:guest@rabbitmq"
coney = Coney(app)
@coney.queue(queue_name="process")
def process_queue(ch, method, props, body):
# do something with body
print(body, flush=True)
Work queues¶
In this example service1 publishes a message. This message it then either processed by service2 or service3, depending on how reads the message first. This can be useful, if you want to split up the workload.
Service 1¶
from flask import Flask
from flask_coney import Coney
app = Flask(__name__)
app.config["CONEY_BROKER_URI"] = "amqp://guest:guest@rabbitmq"
coney = Coney(app)
Service 2¶
from flask import Flask
from flask_coney import Coney
app = Flask(__name__)
app.config["CONEY_BROKER_URI"] = "amqp://guest:guest@rabbitmq"
coney = Coney(app)
Service 3¶
from flask import Flask
from flask_coney import Coney
app = Flask(__name__)
app.config["CONEY_BROKER_URI"] = "amqp://guest:guest@rabbitmq"
coney = Coney(app)
Publish/Subscribe¶
In this example service1 publishes a message to an exchange. Because it is a fanout exchange all queues will receive a copy of this message. Thus, service2 and service3 will process the message.
Service 1¶
from flask import Flask
from flask_coney import Coney
app = Flask(__name__)
app.config["CONEY_BROKER_URI"] = "amqp://guest:guest@rabbitmq"
coney = Coney(app)
@app.route("/pub", methods=["POST"])
def pub():
coney.publish("A message", exchange_name="notify", routing_key="")
return "It will be process by service2 and service3"
Service 2¶
from flask import Flask
from flask_coney import Coney, ExchangeType
app = Flask(__name__)
app.config["CONEY_BROKER_URI"] = "amqp://guest:guest@rabbitmq"
coney = Coney(app)
@coney.queue(
queue_name="service2.notify",
exchange_name="notify",
exchange_type=ExchangeType.FANOUT,
)
def notify_queue(ch, method, props, body):
print(body, flush=True)
Service 3¶
from flask import Flask
from flask_coney import Coney, ExchangeType
app = Flask(__name__)
app.config["CONEY_BROKER_URI"] = "amqp://guest:guest@rabbitmq"
coney = Coney(app)
@coney.queue(
queue_name="service3.notify",
exchange_name="notify",
exchange_type=ExchangeType.FANOUT,
)
def notify_queue(ch, method, props, body):
print(body, flush=True)
Routing¶
Service 1¶
from flask import Flask
from flask_coney import Coney
app = Flask(__name__)
app.config["CONEY_BROKER_URI"] = "amqp://guest:guest@rabbitmq"
coney = Coney(app)
@app.route("/warning", methods=["POST"])
def warning():
coney.publish("This is a warning", exchange_name="logs", routing_key="warning")
return "Warning published"
@app.route("/error", methods=["POST"])
def error():
coney.publish("This is an error", exchange_name="logs", routing_key="error")
return "Error published"
@app.route("/info", methods=["POST"])
def info():
coney.publish("This is an info", exchange_name="logs", routing_key="info")
return "Info published"
Service 2¶
from flask import Flask
from flask_coney import Coney
app = Flask(__name__)
app.config["CONEY_BROKER_URI"] = "amqp://guest:guest@rabbitmq"
coney = Coney(app)
@coney.queue(queue_name="logs_error", exchange_name="logs", routing_key="error")
def error_queue(ch, method, props, body):
print(body, flush=True)
@coney.queue(queue_name="logs_warning", exchange_name="logs", routing_key="warning")
def warning_queue(ch, method, props, body):
print(body, flush=True)
Service 3¶
from flask import Flask
from flask_coney import Coney
app = Flask(__name__)
app.config["CONEY_BROKER_URI"] = "amqp://guest:guest@rabbitmq"
coney = Coney(app)
@coney.queue(queue_name="logs_info", exchange_name="logs", routing_key="info")
def info_queue(ch, method, props, body):
print(body, flush=True)
Topics¶
Service 1¶
from flask import Flask
from flask_coney import Coney
app = Flask(__name__)
app.config["CONEY_BROKER_URI"] = "amqp://guest:guest@rabbitmq"
coney = Coney(app)
Service 2¶
from flask import Flask
from flask_coney import Coney
app = Flask(__name__)
app.config["CONEY_BROKER_URI"] = "amqp://guest:guest@rabbitmq"
coney = Coney(app)
Service 3¶
from flask import Flask
from flask_coney import Coney
app = Flask(__name__)
app.config["CONEY_BROKER_URI"] = "amqp://guest:guest@rabbitmq"
coney = Coney(app)
Request/Reply¶
Service 1¶
from flask import Flask
from flask_coney import Coney
app = Flask(__name__)
app.config["CONEY_BROKER_URI"] = "amqp://guest:guest@rabbitmq"
coney = Coney(app)
@app.route("/rpc", methods=["GET"])
def rpc():
response = coney.publish_sync("Hi", routing_key="rpc")
return response
Service 2¶
from flask import Flask
from flask_coney import Coney
app = Flask(__name__)
app.config["CONEY_BROKER_URI"] = "amqp://guest:guest@rabbitmq"
coney = Coney(app)
@coney.queue(queue_name="rpc")
def rpc_queue(ch, method, props, body):
result = f"{body.decode('utf-8')} touched by me"
print(result, flush=True)
coney.reply_sync(ch, method, props, result)