API¶
Configuration¶
-
class
flask_coney.
Coney
(app=None, testing=False)¶ This class is used to control the Coney integration to one or more Flask applications. Depending on how you initialize the object it is usable right away or will attach as needed to a Flask application.
There are two usage modes which work very similarly. One is binding the instance to a very specific Flask application:
app = Flask(__name__) coney = Coney(app)
The second possibility is to create the object once and configure the application later to support it:
coney = Coney() def create_app(): app = Flask(__name__) coney.init_app(app) return app
To listen on a queue use:
coney = Coney(app) @coney.queue(queue_name="test") def queue_test(ch, method, props, body): pass
To publish a message use:
coney = Coney(app) coney.publish({"test": 1})
- Parameters
-
channel
(app=None)¶ Provides context for a channel.
Example:
with channel(app) as ch: ch.basic_publish()
-
connection
(app=None)¶ Provides context for a connection.
Example:
with connection(app) as c: c.channel()
- Parameters
app (
Optional
[Flask
]) – A flask app- Return type
-
get_app
(reference_app=None)¶ Helper method that implements the logic to look up an application.
- Parameters
reference_app (
Optional
[Flask
]) – A flask app
-
init_app
(app)¶ This callback can be used to initialize an application for the use with Coney.
-
publish
(body, exchange_name='', routing_key='', durable=False, properties=None, app=None)¶ Will publish a message
Example:
@app.route('/process'): def process(): coney.publish({"text": "process me"})
-
publish_sync
(body, exchange_name='', routing_key='', properties=None, timeout=10, app=None)¶ Will publish a message and wait for the response
Example:
# client @app.route('/concat') def concat(): a = request.args.get('a') b = request.args.get('b') body = {'a': a, 'b': b} result = coney.publish_sync(body, routing_key="rpc") return result # server @queue(queue_name="rpc") def concat_callback(ch, method, props, body): result = body["a"] + body["b"] body = {"result": result} coney.reply_sync(ch, method, props, body)
- Parameters
- Raises
SyncTimeoutError: if no message received in timeout
-
queue
(queue_name='', exchange_name='', exchange_type=<ExchangeType.DIRECT: 'direct'>, routing_key=None, routing_keys=None, app=None)¶ A decorator for consuming a queue. A thread will start in the background, if no other thread for this purpose was already started. There will only be one thread for every queue.
Example:
@coney.queue(queue_name="test") def queue_test(ch, method, props, body): pass
You can also bind the queue to multiple routing keys:
@coney.queue(routing_keys=["first", "second"]) def queue_multiple(ch, method, props, body): pass
If routing_keys and a routing_key is provided, they will be combined.
- Parameters
- Return type
-
reply_sync
(ch, method, properties, body, app=None)¶ Will reply to a message, which was send by
publish_sync()
Example:
@queue(queue_name="rpc") def concat_callback(ch, method, props, body): result = body["a"] + body["b"] body = {"result": result} coney.reply_sync(ch, method, props, body)
This is a conveniences short hand method for:
@queue(queue_name="rpc") def concat_callback(ch, method, props, body): result = body["a"] + body["b"] body = {"result": result} self.publish( body, routing_key=properties.reply_to, properties={"correlation_id": properties.correlation_id}, app=app, )
- Parameters
ch (
Channel
) –method (
Deliver
) –properties (
BasicProperties
) –body (
str
) – The message to send