Merge "Add C demo agent"
This commit is contained in:
commit
bb89406f12
|
@ -0,0 +1,38 @@
|
|||
# Makefile for murano agent
|
||||
|
||||
CFLAGS= -Wall
|
||||
|
||||
default_target: all
|
||||
|
||||
|
||||
all: agent-binary producer-binary
|
||||
|
||||
|
||||
# NOTE: Use this if RabbitMQ needs to be linked in as a dynamic library
|
||||
#agent-binary: murano-agent.o lcfg_static.o utils.o
|
||||
# gcc -o murano-agent murano-agent.o lcfg_static.o utils.o -lrabbitmq -lrt
|
||||
|
||||
agent-binary: murano-agent.o lcfg_static.o utils.o librabbitmq.a
|
||||
gcc $(CFLAGS) -o murano-agent murano-agent.o lcfg_static.o utils.o -L. -lrabbitmq -lrt
|
||||
|
||||
# NOTE: Use this if RabbitMQ needs to be linked in as a dynamic library
|
||||
#producer-binary: producer.o utils.o
|
||||
# gcc -o producer producer.o utils.o -lrabbitmq -lrt
|
||||
|
||||
producer-binary: producer.o utils.o librabbitmq.a
|
||||
gcc $(CFLAGS) -o producer producer.o utils.o -L. -lrabbitmq -lrt
|
||||
|
||||
murano-agent.o: murano-agent.c
|
||||
gcc $(CFLAGS) -c -I. -fPIC murano-agent.c
|
||||
|
||||
producer.o: producer.c
|
||||
gcc $(CFLAGS) -c -I. -fPIC producer.c
|
||||
|
||||
lcfg_static.o: lcfg_static.c
|
||||
gcc $(CFLAGS) -c -fPIC lcfg_static.c
|
||||
|
||||
utils.o: utils.c
|
||||
gcc $(CFLAGS) -c -I. -fPIC utils.c
|
||||
|
||||
clean:
|
||||
rm -f *.o murano-agent producer
|
|
@ -0,0 +1,64 @@
|
|||
This is a demo implementation of Murano agent.
|
||||
|
||||
=== The scenario ===
|
||||
|
||||
Effectively the agent only connects to an AMQP queue
|
||||
using properties from the configuration file, consumes
|
||||
all the messages from the queue, logs them and puts
|
||||
result messages into a result queue. Result messages
|
||||
are just stubs signaling success back to the calling
|
||||
application.
|
||||
|
||||
=== Building binaries ===
|
||||
|
||||
Build process has been tested on Ubuntu 13.04 only for
|
||||
x86_64 architecture. Note that the distribution contains
|
||||
RabbitMQ client pre-built as a static library. It gets
|
||||
linked with demo agent object files statically. It was
|
||||
done to eliminate the need to build and install RabbitMQ
|
||||
client library separately in case of using Ubuntu 64 bit.
|
||||
|
||||
In order to build demo agent for embedded operating systems
|
||||
based on BusyBox technology (such as Cirros) refer to
|
||||
Buildroot toolkit (http://buildroot.uclibc.org/).
|
||||
|
||||
To build agent binary:
|
||||
make agent-binary
|
||||
|
||||
To build test producer binary:
|
||||
make prodcer-binary
|
||||
|
||||
To build both binaries:
|
||||
make
|
||||
|
||||
To clean up the source directory:
|
||||
make clean
|
||||
|
||||
=== Usage ===
|
||||
|
||||
To run agent:
|
||||
./murano-agent <config_file> <log_file>
|
||||
|
||||
<config_file>:
|
||||
Refer to murano-agent.conf.example file to find out
|
||||
what configuration properties are possible.
|
||||
|
||||
<log_file>:
|
||||
Path to log file which agent writes the messages into.
|
||||
|
||||
To run test message producer:
|
||||
./producer <host> <port> <message_rate> <message_count>
|
||||
|
||||
<host>:
|
||||
RabbitMQ server host for the producer to connect to.
|
||||
|
||||
<port>:
|
||||
RabbitMQ server port for the producer to connect to.
|
||||
|
||||
<message_rate>:
|
||||
Message rate at which the producer sends messages
|
||||
(delay in seconds between messages).
|
||||
|
||||
<message_count>:
|
||||
A number of messages the producer should send.
|
||||
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,630 @@
|
|||
/* Generated code. Do not edit. Edit and re-run codegen.py instead.
|
||||
*
|
||||
* ***** BEGIN LICENSE BLOCK *****
|
||||
* Version: MIT
|
||||
*
|
||||
* Portions created by Alan Antonuk are Copyright (c) 2012-2013
|
||||
* Alan Antonuk. All Rights Reserved.
|
||||
*
|
||||
* Portions created by VMware are Copyright (c) 2007-2012 VMware, Inc.
|
||||
* All Rights Reserved.
|
||||
*
|
||||
* Portions created by Tony Garnock-Jones are Copyright (c) 2009-2010
|
||||
* VMware, Inc. and Tony Garnock-Jones. All Rights Reserved.
|
||||
*
|
||||
* Permission is hereby granted, free of charge, to any person
|
||||
* obtaining a copy of this software and associated documentation
|
||||
* files (the "Software"), to deal in the Software without
|
||||
* restriction, including without limitation the rights to use, copy,
|
||||
* modify, merge, publish, distribute, sublicense, and/or sell copies
|
||||
* of the Software, and to permit persons to whom the Software is
|
||||
* furnished to do so, subject to the following conditions:
|
||||
*
|
||||
* The above copyright notice and this permission notice shall be
|
||||
* included in all copies or substantial portions of the Software.
|
||||
*
|
||||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
|
||||
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
|
||||
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
|
||||
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
|
||||
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
|
||||
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
|
||||
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
* SOFTWARE.
|
||||
* ***** END LICENSE BLOCK *****
|
||||
*/
|
||||
|
||||
#ifndef AMQP_FRAMING_H
|
||||
#define AMQP_FRAMING_H
|
||||
|
||||
#include <amqp.h>
|
||||
|
||||
AMQP_BEGIN_DECLS
|
||||
|
||||
#define AMQP_PROTOCOL_VERSION_MAJOR 0
|
||||
#define AMQP_PROTOCOL_VERSION_MINOR 9
|
||||
#define AMQP_PROTOCOL_VERSION_REVISION 1
|
||||
#define AMQP_PROTOCOL_PORT 5672
|
||||
#define AMQP_FRAME_METHOD 1
|
||||
#define AMQP_FRAME_HEADER 2
|
||||
#define AMQP_FRAME_BODY 3
|
||||
#define AMQP_FRAME_HEARTBEAT 8
|
||||
#define AMQP_FRAME_MIN_SIZE 4096
|
||||
#define AMQP_FRAME_END 206
|
||||
#define AMQP_REPLY_SUCCESS 200
|
||||
#define AMQP_CONTENT_TOO_LARGE 311
|
||||
#define AMQP_NO_ROUTE 312
|
||||
#define AMQP_NO_CONSUMERS 313
|
||||
#define AMQP_ACCESS_REFUSED 403
|
||||
#define AMQP_NOT_FOUND 404
|
||||
#define AMQP_RESOURCE_LOCKED 405
|
||||
#define AMQP_PRECONDITION_FAILED 406
|
||||
#define AMQP_CONNECTION_FORCED 320
|
||||
#define AMQP_INVALID_PATH 402
|
||||
#define AMQP_FRAME_ERROR 501
|
||||
#define AMQP_SYNTAX_ERROR 502
|
||||
#define AMQP_COMMAND_INVALID 503
|
||||
#define AMQP_CHANNEL_ERROR 504
|
||||
#define AMQP_UNEXPECTED_FRAME 505
|
||||
#define AMQP_RESOURCE_ERROR 506
|
||||
#define AMQP_NOT_ALLOWED 530
|
||||
#define AMQP_NOT_IMPLEMENTED 540
|
||||
#define AMQP_INTERNAL_ERROR 541
|
||||
|
||||
/* Function prototypes. */
|
||||
|
||||
AMQP_PUBLIC_FUNCTION
|
||||
char const *
|
||||
AMQP_CALL amqp_constant_name(int constantNumber);
|
||||
|
||||
AMQP_PUBLIC_FUNCTION
|
||||
amqp_boolean_t
|
||||
AMQP_CALL amqp_constant_is_hard_error(int constantNumber);
|
||||
|
||||
AMQP_PUBLIC_FUNCTION
|
||||
char const *
|
||||
AMQP_CALL amqp_method_name(amqp_method_number_t methodNumber);
|
||||
|
||||
AMQP_PUBLIC_FUNCTION
|
||||
amqp_boolean_t
|
||||
AMQP_CALL amqp_method_has_content(amqp_method_number_t methodNumber);
|
||||
|
||||
AMQP_PUBLIC_FUNCTION
|
||||
int
|
||||
AMQP_CALL amqp_decode_method(amqp_method_number_t methodNumber,
|
||||
amqp_pool_t *pool,
|
||||
amqp_bytes_t encoded,
|
||||
void **decoded);
|
||||
|
||||
AMQP_PUBLIC_FUNCTION
|
||||
int
|
||||
AMQP_CALL amqp_decode_properties(uint16_t class_id,
|
||||
amqp_pool_t *pool,
|
||||
amqp_bytes_t encoded,
|
||||
void **decoded);
|
||||
|
||||
AMQP_PUBLIC_FUNCTION
|
||||
int
|
||||
AMQP_CALL amqp_encode_method(amqp_method_number_t methodNumber,
|
||||
void *decoded,
|
||||
amqp_bytes_t encoded);
|
||||
|
||||
AMQP_PUBLIC_FUNCTION
|
||||
int
|
||||
AMQP_CALL amqp_encode_properties(uint16_t class_id,
|
||||
void *decoded,
|
||||
amqp_bytes_t encoded);
|
||||
|
||||
/* Method field records. */
|
||||
|
||||
#define AMQP_CONNECTION_START_METHOD ((amqp_method_number_t) 0x000A000A) /* 10, 10; 655370 */
|
||||
typedef struct amqp_connection_start_t_ {
|
||||
uint8_t version_major;
|
||||
uint8_t version_minor;
|
||||
amqp_table_t server_properties;
|
||||
amqp_bytes_t mechanisms;
|
||||
amqp_bytes_t locales;
|
||||
} amqp_connection_start_t;
|
||||
|
||||
#define AMQP_CONNECTION_START_OK_METHOD ((amqp_method_number_t) 0x000A000B) /* 10, 11; 655371 */
|
||||
typedef struct amqp_connection_start_ok_t_ {
|
||||
amqp_table_t client_properties;
|
||||
amqp_bytes_t mechanism;
|
||||
amqp_bytes_t response;
|
||||
amqp_bytes_t locale;
|
||||
} amqp_connection_start_ok_t;
|
||||
|
||||
#define AMQP_CONNECTION_SECURE_METHOD ((amqp_method_number_t) 0x000A0014) /* 10, 20; 655380 */
|
||||
typedef struct amqp_connection_secure_t_ {
|
||||
amqp_bytes_t challenge;
|
||||
} amqp_connection_secure_t;
|
||||
|
||||
#define AMQP_CONNECTION_SECURE_OK_METHOD ((amqp_method_number_t) 0x000A0015) /* 10, 21; 655381 */
|
||||
typedef struct amqp_connection_secure_ok_t_ {
|
||||
amqp_bytes_t response;
|
||||
} amqp_connection_secure_ok_t;
|
||||
|
||||
#define AMQP_CONNECTION_TUNE_METHOD ((amqp_method_number_t) 0x000A001E) /* 10, 30; 655390 */
|
||||
typedef struct amqp_connection_tune_t_ {
|
||||
uint16_t channel_max;
|
||||
uint32_t frame_max;
|
||||
uint16_t heartbeat;
|
||||
} amqp_connection_tune_t;
|
||||
|
||||
#define AMQP_CONNECTION_TUNE_OK_METHOD ((amqp_method_number_t) 0x000A001F) /* 10, 31; 655391 */
|
||||
typedef struct amqp_connection_tune_ok_t_ {
|
||||
uint16_t channel_max;
|
||||
uint32_t frame_max;
|
||||
uint16_t heartbeat;
|
||||
} amqp_connection_tune_ok_t;
|
||||
|
||||
#define AMQP_CONNECTION_OPEN_METHOD ((amqp_method_number_t) 0x000A0028) /* 10, 40; 655400 */
|
||||
typedef struct amqp_connection_open_t_ {
|
||||
amqp_bytes_t virtual_host;
|
||||
amqp_bytes_t capabilities;
|
||||
amqp_boolean_t insist;
|
||||
} amqp_connection_open_t;
|
||||
|
||||
#define AMQP_CONNECTION_OPEN_OK_METHOD ((amqp_method_number_t) 0x000A0029) /* 10, 41; 655401 */
|
||||
typedef struct amqp_connection_open_ok_t_ {
|
||||
amqp_bytes_t known_hosts;
|
||||
} amqp_connection_open_ok_t;
|
||||
|
||||
#define AMQP_CONNECTION_CLOSE_METHOD ((amqp_method_number_t) 0x000A0032) /* 10, 50; 655410 */
|
||||
typedef struct amqp_connection_close_t_ {
|
||||
uint16_t reply_code;
|
||||
amqp_bytes_t reply_text;
|
||||
uint16_t class_id;
|
||||
uint16_t method_id;
|
||||
} amqp_connection_close_t;
|
||||
|
||||
#define AMQP_CONNECTION_CLOSE_OK_METHOD ((amqp_method_number_t) 0x000A0033) /* 10, 51; 655411 */
|
||||
typedef struct amqp_connection_close_ok_t_ {
|
||||
char dummy; /* Dummy field to avoid empty struct */
|
||||
} amqp_connection_close_ok_t;
|
||||
|
||||
#define AMQP_CHANNEL_OPEN_METHOD ((amqp_method_number_t) 0x0014000A) /* 20, 10; 1310730 */
|
||||
typedef struct amqp_channel_open_t_ {
|
||||
amqp_bytes_t out_of_band;
|
||||
} amqp_channel_open_t;
|
||||
|
||||
#define AMQP_CHANNEL_OPEN_OK_METHOD ((amqp_method_number_t) 0x0014000B) /* 20, 11; 1310731 */
|
||||
typedef struct amqp_channel_open_ok_t_ {
|
||||
amqp_bytes_t channel_id;
|
||||
} amqp_channel_open_ok_t;
|
||||
|
||||
#define AMQP_CHANNEL_FLOW_METHOD ((amqp_method_number_t) 0x00140014) /* 20, 20; 1310740 */
|
||||
typedef struct amqp_channel_flow_t_ {
|
||||
amqp_boolean_t active;
|
||||
} amqp_channel_flow_t;
|
||||
|
||||
#define AMQP_CHANNEL_FLOW_OK_METHOD ((amqp_method_number_t) 0x00140015) /* 20, 21; 1310741 */
|
||||
typedef struct amqp_channel_flow_ok_t_ {
|
||||
amqp_boolean_t active;
|
||||
} amqp_channel_flow_ok_t;
|
||||
|
||||
#define AMQP_CHANNEL_CLOSE_METHOD ((amqp_method_number_t) 0x00140028) /* 20, 40; 1310760 */
|
||||
typedef struct amqp_channel_close_t_ {
|
||||
uint16_t reply_code;
|
||||
amqp_bytes_t reply_text;
|
||||
uint16_t class_id;
|
||||
uint16_t method_id;
|
||||
} amqp_channel_close_t;
|
||||
|
||||
#define AMQP_CHANNEL_CLOSE_OK_METHOD ((amqp_method_number_t) 0x00140029) /* 20, 41; 1310761 */
|
||||
typedef struct amqp_channel_close_ok_t_ {
|
||||
char dummy; /* Dummy field to avoid empty struct */
|
||||
} amqp_channel_close_ok_t;
|
||||
|
||||
#define AMQP_ACCESS_REQUEST_METHOD ((amqp_method_number_t) 0x001E000A) /* 30, 10; 1966090 */
|
||||
typedef struct amqp_access_request_t_ {
|
||||
amqp_bytes_t realm;
|
||||
amqp_boolean_t exclusive;
|
||||
amqp_boolean_t passive;
|
||||
amqp_boolean_t active;
|
||||
amqp_boolean_t write;
|
||||
amqp_boolean_t read;
|
||||
} amqp_access_request_t;
|
||||
|
||||
#define AMQP_ACCESS_REQUEST_OK_METHOD ((amqp_method_number_t) 0x001E000B) /* 30, 11; 1966091 */
|
||||
typedef struct amqp_access_request_ok_t_ {
|
||||
uint16_t ticket;
|
||||
} amqp_access_request_ok_t;
|
||||
|
||||
#define AMQP_EXCHANGE_DECLARE_METHOD ((amqp_method_number_t) 0x0028000A) /* 40, 10; 2621450 */
|
||||
typedef struct amqp_exchange_declare_t_ {
|
||||
uint16_t ticket;
|
||||
amqp_bytes_t exchange;
|
||||
amqp_bytes_t type;
|
||||
amqp_boolean_t passive;
|
||||
amqp_boolean_t durable;
|
||||
amqp_boolean_t auto_delete;
|
||||
amqp_boolean_t internal;
|
||||
amqp_boolean_t nowait;
|
||||
amqp_table_t arguments;
|
||||
} amqp_exchange_declare_t;
|
||||
|
||||
#define AMQP_EXCHANGE_DECLARE_OK_METHOD ((amqp_method_number_t) 0x0028000B) /* 40, 11; 2621451 */
|
||||
typedef struct amqp_exchange_declare_ok_t_ {
|
||||
char dummy; /* Dummy field to avoid empty struct */
|
||||
} amqp_exchange_declare_ok_t;
|
||||
|
||||
#define AMQP_EXCHANGE_DELETE_METHOD ((amqp_method_number_t) 0x00280014) /* 40, 20; 2621460 */
|
||||
typedef struct amqp_exchange_delete_t_ {
|
||||
uint16_t ticket;
|
||||
amqp_bytes_t exchange;
|
||||
amqp_boolean_t if_unused;
|
||||
amqp_boolean_t nowait;
|
||||
} amqp_exchange_delete_t;
|
||||
|
||||
#define AMQP_EXCHANGE_DELETE_OK_METHOD ((amqp_method_number_t) 0x00280015) /* 40, 21; 2621461 */
|
||||
typedef struct amqp_exchange_delete_ok_t_ {
|
||||
char dummy; /* Dummy field to avoid empty struct */
|
||||
} amqp_exchange_delete_ok_t;
|
||||
|
||||
#define AMQP_EXCHANGE_BIND_METHOD ((amqp_method_number_t) 0x0028001E) /* 40, 30; 2621470 */
|
||||
typedef struct amqp_exchange_bind_t_ {
|
||||
uint16_t ticket;
|
||||
amqp_bytes_t destination;
|
||||
amqp_bytes_t source;
|
||||
amqp_bytes_t routing_key;
|
||||
amqp_boolean_t nowait;
|
||||
amqp_table_t arguments;
|
||||
} amqp_exchange_bind_t;
|
||||
|
||||
#define AMQP_EXCHANGE_BIND_OK_METHOD ((amqp_method_number_t) 0x0028001F) /* 40, 31; 2621471 */
|
||||
typedef struct amqp_exchange_bind_ok_t_ {
|
||||
char dummy; /* Dummy field to avoid empty struct */
|
||||
} amqp_exchange_bind_ok_t;
|
||||
|
||||
#define AMQP_EXCHANGE_UNBIND_METHOD ((amqp_method_number_t) 0x00280028) /* 40, 40; 2621480 */
|
||||
typedef struct amqp_exchange_unbind_t_ {
|
||||
uint16_t ticket;
|
||||
amqp_bytes_t destination;
|
||||
amqp_bytes_t source;
|
||||
amqp_bytes_t routing_key;
|
||||
amqp_boolean_t nowait;
|
||||
amqp_table_t arguments;
|
||||
} amqp_exchange_unbind_t;
|
||||
|
||||
#define AMQP_EXCHANGE_UNBIND_OK_METHOD ((amqp_method_number_t) 0x00280033) /* 40, 51; 2621491 */
|
||||
typedef struct amqp_exchange_unbind_ok_t_ {
|
||||
char dummy; /* Dummy field to avoid empty struct */
|
||||
} amqp_exchange_unbind_ok_t;
|
||||
|
||||
#define AMQP_QUEUE_DECLARE_METHOD ((amqp_method_number_t) 0x0032000A) /* 50, 10; 3276810 */
|
||||
typedef struct amqp_queue_declare_t_ {
|
||||
uint16_t ticket;
|
||||
amqp_bytes_t queue;
|
||||
amqp_boolean_t passive;
|
||||
amqp_boolean_t durable;
|
||||
amqp_boolean_t exclusive;
|
||||
amqp_boolean_t auto_delete;
|
||||
amqp_boolean_t nowait;
|
||||
amqp_table_t arguments;
|
||||
} amqp_queue_declare_t;
|
||||
|
||||
#define AMQP_QUEUE_DECLARE_OK_METHOD ((amqp_method_number_t) 0x0032000B) /* 50, 11; 3276811 */
|
||||
typedef struct amqp_queue_declare_ok_t_ {
|
||||
amqp_bytes_t queue;
|
||||
uint32_t message_count;
|
||||
uint32_t consumer_count;
|
||||
} amqp_queue_declare_ok_t;
|
||||
|
||||
#define AMQP_QUEUE_BIND_METHOD ((amqp_method_number_t) 0x00320014) /* 50, 20; 3276820 */
|
||||
typedef struct amqp_queue_bind_t_ {
|
||||
uint16_t ticket;
|
||||
amqp_bytes_t queue;
|
||||
amqp_bytes_t exchange;
|
||||
amqp_bytes_t routing_key;
|
||||
amqp_boolean_t nowait;
|
||||
amqp_table_t arguments;
|
||||
} amqp_queue_bind_t;
|
||||
|
||||
#define AMQP_QUEUE_BIND_OK_METHOD ((amqp_method_number_t) 0x00320015) /* 50, 21; 3276821 */
|
||||
typedef struct amqp_queue_bind_ok_t_ {
|
||||
char dummy; /* Dummy field to avoid empty struct */
|
||||
} amqp_queue_bind_ok_t;
|
||||
|
||||
#define AMQP_QUEUE_PURGE_METHOD ((amqp_method_number_t) 0x0032001E) /* 50, 30; 3276830 */
|
||||
typedef struct amqp_queue_purge_t_ {
|
||||
uint16_t ticket;
|
||||
amqp_bytes_t queue;
|
||||
amqp_boolean_t nowait;
|
||||
} amqp_queue_purge_t;
|
||||
|
||||
#define AMQP_QUEUE_PURGE_OK_METHOD ((amqp_method_number_t) 0x0032001F) /* 50, 31; 3276831 */
|
||||
typedef struct amqp_queue_purge_ok_t_ {
|
||||
uint32_t message_count;
|
||||
} amqp_queue_purge_ok_t;
|
||||
|
||||
#define AMQP_QUEUE_DELETE_METHOD ((amqp_method_number_t) 0x00320028) /* 50, 40; 3276840 */
|
||||
typedef struct amqp_queue_delete_t_ {
|
||||
uint16_t ticket;
|
||||
amqp_bytes_t queue;
|
||||
amqp_boolean_t if_unused;
|
||||
amqp_boolean_t if_empty;
|
||||
amqp_boolean_t nowait;
|
||||
} amqp_queue_delete_t;
|
||||
|
||||
#define AMQP_QUEUE_DELETE_OK_METHOD ((amqp_method_number_t) 0x00320029) /* 50, 41; 3276841 */
|
||||
typedef struct amqp_queue_delete_ok_t_ {
|
||||
uint32_t message_count;
|
||||
} amqp_queue_delete_ok_t;
|
||||
|
||||
#define AMQP_QUEUE_UNBIND_METHOD ((amqp_method_number_t) 0x00320032) /* 50, 50; 3276850 */
|
||||
typedef struct amqp_queue_unbind_t_ {
|
||||
uint16_t ticket;
|
||||
amqp_bytes_t queue;
|
||||
amqp_bytes_t exchange;
|
||||
amqp_bytes_t routing_key;
|
||||
amqp_table_t arguments;
|
||||
} amqp_queue_unbind_t;
|
||||
|
||||
#define AMQP_QUEUE_UNBIND_OK_METHOD ((amqp_method_number_t) 0x00320033) /* 50, 51; 3276851 */
|
||||
typedef struct amqp_queue_unbind_ok_t_ {
|
||||
char dummy; /* Dummy field to avoid empty struct */
|
||||
} amqp_queue_unbind_ok_t;
|
||||
|
||||
#define AMQP_BASIC_QOS_METHOD ((amqp_method_number_t) 0x003C000A) /* 60, 10; 3932170 */
|
||||
typedef struct amqp_basic_qos_t_ {
|
||||
uint32_t prefetch_size;
|
||||
uint16_t prefetch_count;
|
||||
amqp_boolean_t global;
|
||||
} amqp_basic_qos_t;
|
||||
|
||||
#define AMQP_BASIC_QOS_OK_METHOD ((amqp_method_number_t) 0x003C000B) /* 60, 11; 3932171 */
|
||||
typedef struct amqp_basic_qos_ok_t_ {
|
||||
char dummy; /* Dummy field to avoid empty struct */
|
||||
} amqp_basic_qos_ok_t;
|
||||
|
||||
#define AMQP_BASIC_CONSUME_METHOD ((amqp_method_number_t) 0x003C0014) /* 60, 20; 3932180 */
|
||||
typedef struct amqp_basic_consume_t_ {
|
||||
uint16_t ticket;
|
||||
amqp_bytes_t queue;
|
||||
amqp_bytes_t consumer_tag;
|
||||
amqp_boolean_t no_local;
|
||||
amqp_boolean_t no_ack;
|
||||
amqp_boolean_t exclusive;
|
||||
amqp_boolean_t nowait;
|
||||
amqp_table_t arguments;
|
||||
} amqp_basic_consume_t;
|
||||
|
||||
#define AMQP_BASIC_CONSUME_OK_METHOD ((amqp_method_number_t) 0x003C0015) /* 60, 21; 3932181 */
|
||||
typedef struct amqp_basic_consume_ok_t_ {
|
||||
amqp_bytes_t consumer_tag;
|
||||
} amqp_basic_consume_ok_t;
|
||||
|
||||
#define AMQP_BASIC_CANCEL_METHOD ((amqp_method_number_t) 0x003C001E) /* 60, 30; 3932190 */
|
||||
typedef struct amqp_basic_cancel_t_ {
|
||||
amqp_bytes_t consumer_tag;
|
||||
amqp_boolean_t nowait;
|
||||
} amqp_basic_cancel_t;
|
||||
|
||||
#define AMQP_BASIC_CANCEL_OK_METHOD ((amqp_method_number_t) 0x003C001F) /* 60, 31; 3932191 */
|
||||
typedef struct amqp_basic_cancel_ok_t_ {
|
||||
amqp_bytes_t consumer_tag;
|
||||
} amqp_basic_cancel_ok_t;
|
||||
|
||||
#define AMQP_BASIC_PUBLISH_METHOD ((amqp_method_number_t) 0x003C0028) /* 60, 40; 3932200 */
|
||||
typedef struct amqp_basic_publish_t_ {
|
||||
uint16_t ticket;
|
||||
amqp_bytes_t exchange;
|
||||
amqp_bytes_t routing_key;
|
||||
amqp_boolean_t mandatory;
|
||||
amqp_boolean_t immediate;
|
||||
} amqp_basic_publish_t;
|
||||
|
||||
#define AMQP_BASIC_RETURN_METHOD ((amqp_method_number_t) 0x003C0032) /* 60, 50; 3932210 */
|
||||
typedef struct amqp_basic_return_t_ {
|
||||
uint16_t reply_code;
|
||||
amqp_bytes_t reply_text;
|
||||
amqp_bytes_t exchange;
|
||||
amqp_bytes_t routing_key;
|
||||
} amqp_basic_return_t;
|
||||
|
||||
#define AMQP_BASIC_DELIVER_METHOD ((amqp_method_number_t) 0x003C003C) /* 60, 60; 3932220 */
|
||||
typedef struct amqp_basic_deliver_t_ {
|
||||
amqp_bytes_t consumer_tag;
|
||||
uint64_t delivery_tag;
|
||||
amqp_boolean_t redelivered;
|
||||
amqp_bytes_t exchange;
|
||||
amqp_bytes_t routing_key;
|
||||
} amqp_basic_deliver_t;
|
||||
|
||||
#define AMQP_BASIC_GET_METHOD ((amqp_method_number_t) 0x003C0046) /* 60, 70; 3932230 */
|
||||
typedef struct amqp_basic_get_t_ {
|
||||
uint16_t ticket;
|
||||
amqp_bytes_t queue;
|
||||
amqp_boolean_t no_ack;
|
||||
} amqp_basic_get_t;
|
||||
|
||||
#define AMQP_BASIC_GET_OK_METHOD ((amqp_method_number_t) 0x003C0047) /* 60, 71; 3932231 */
|
||||
typedef struct amqp_basic_get_ok_t_ {
|
||||
uint64_t delivery_tag;
|
||||
amqp_boolean_t redelivered;
|
||||
amqp_bytes_t exchange;
|
||||
amqp_bytes_t routing_key;
|
||||
uint32_t message_count;
|
||||
} amqp_basic_get_ok_t;
|
||||
|
||||
#define AMQP_BASIC_GET_EMPTY_METHOD ((amqp_method_number_t) 0x003C0048) /* 60, 72; 3932232 */
|
||||
typedef struct amqp_basic_get_empty_t_ {
|
||||
amqp_bytes_t cluster_id;
|
||||
} amqp_basic_get_empty_t;
|
||||
|
||||
#define AMQP_BASIC_ACK_METHOD ((amqp_method_number_t) 0x003C0050) /* 60, 80; 3932240 */
|
||||
typedef struct amqp_basic_ack_t_ {
|
||||
uint64_t delivery_tag;
|
||||
amqp_boolean_t multiple;
|
||||
} amqp_basic_ack_t;
|
||||
|
||||
#define AMQP_BASIC_REJECT_METHOD ((amqp_method_number_t) 0x003C005A) /* 60, 90; 3932250 */
|
||||
typedef struct amqp_basic_reject_t_ {
|
||||
uint64_t delivery_tag;
|
||||
amqp_boolean_t requeue;
|
||||
} amqp_basic_reject_t;
|
||||
|
||||
#define AMQP_BASIC_RECOVER_ASYNC_METHOD ((amqp_method_number_t) 0x003C0064) /* 60, 100; 3932260 */
|
||||
typedef struct amqp_basic_recover_async_t_ {
|
||||
amqp_boolean_t requeue;
|
||||
} amqp_basic_recover_async_t;
|
||||
|
||||
#define AMQP_BASIC_RECOVER_METHOD ((amqp_method_number_t) 0x003C006E) /* 60, 110; 3932270 */
|
||||
typedef struct amqp_basic_recover_t_ {
|
||||
amqp_boolean_t requeue;
|
||||
} amqp_basic_recover_t;
|
||||
|
||||
#define AMQP_BASIC_RECOVER_OK_METHOD ((amqp_method_number_t) 0x003C006F) /* 60, 111; 3932271 */
|
||||
typedef struct amqp_basic_recover_ok_t_ {
|
||||
char dummy; /* Dummy field to avoid empty struct */
|
||||
} amqp_basic_recover_ok_t;
|
||||
|
||||
#define AMQP_BASIC_NACK_METHOD ((amqp_method_number_t) 0x003C0078) /* 60, 120; 3932280 */
|
||||
typedef struct amqp_basic_nack_t_ {
|
||||
uint64_t delivery_tag;
|
||||
amqp_boolean_t multiple;
|
||||
amqp_boolean_t requeue;
|
||||
} amqp_basic_nack_t;
|
||||
|
||||
#define AMQP_TX_SELECT_METHOD ((amqp_method_number_t) 0x005A000A) /* 90, 10; 5898250 */
|
||||
typedef struct amqp_tx_select_t_ {
|
||||
char dummy; /* Dummy field to avoid empty struct */
|
||||
} amqp_tx_select_t;
|
||||
|
||||
#define AMQP_TX_SELECT_OK_METHOD ((amqp_method_number_t) 0x005A000B) /* 90, 11; 5898251 */
|
||||
typedef struct amqp_tx_select_ok_t_ {
|
||||
char dummy; /* Dummy field to avoid empty struct */
|
||||
} amqp_tx_select_ok_t;
|
||||
|
||||
#define AMQP_TX_COMMIT_METHOD ((amqp_method_number_t) 0x005A0014) /* 90, 20; 5898260 */
|
||||
typedef struct amqp_tx_commit_t_ {
|
||||
char dummy; /* Dummy field to avoid empty struct */
|
||||
} amqp_tx_commit_t;
|
||||
|
||||
#define AMQP_TX_COMMIT_OK_METHOD ((amqp_method_number_t) 0x005A0015) /* 90, 21; 5898261 */
|
||||
typedef struct amqp_tx_commit_ok_t_ {
|
||||
char dummy; /* Dummy field to avoid empty struct */
|
||||
} amqp_tx_commit_ok_t;
|
||||
|
||||
#define AMQP_TX_ROLLBACK_METHOD ((amqp_method_number_t) 0x005A001E) /* 90, 30; 5898270 */
|
||||
typedef struct amqp_tx_rollback_t_ {
|
||||
char dummy; /* Dummy field to avoid empty struct */
|
||||
} amqp_tx_rollback_t;
|
||||
|
||||
#define AMQP_TX_ROLLBACK_OK_METHOD ((amqp_method_number_t) 0x005A001F) /* 90, 31; 5898271 */
|
||||
typedef struct amqp_tx_rollback_ok_t_ {
|
||||
char dummy; /* Dummy field to avoid empty struct */
|
||||
} amqp_tx_rollback_ok_t;
|
||||
|
||||
#define AMQP_CONFIRM_SELECT_METHOD ((amqp_method_number_t) 0x0055000A) /* 85, 10; 5570570 */
|
||||
typedef struct amqp_confirm_select_t_ {
|
||||
amqp_boolean_t nowait;
|
||||
} amqp_confirm_select_t;
|
||||
|
||||
#define AMQP_CONFIRM_SELECT_OK_METHOD ((amqp_method_number_t) 0x0055000B) /* 85, 11; 5570571 */
|
||||
typedef struct amqp_confirm_select_ok_t_ {
|
||||
char dummy; /* Dummy field to avoid empty struct */
|
||||
} amqp_confirm_select_ok_t;
|
||||
|
||||
/* Class property records. */
|
||||
#define AMQP_CONNECTION_CLASS (0x000A) /* 10 */
|
||||
typedef struct amqp_connection_properties_t_ {
|
||||
amqp_flags_t _flags;
|
||||
char dummy; /* Dummy field to avoid empty struct */
|
||||
} amqp_connection_properties_t;
|
||||
|
||||
#define AMQP_CHANNEL_CLASS (0x0014) /* 20 */
|
||||
typedef struct amqp_channel_properties_t_ {
|
||||
amqp_flags_t _flags;
|
||||
char dummy; /* Dummy field to avoid empty struct */
|
||||
} amqp_channel_properties_t;
|
||||
|
||||
#define AMQP_ACCESS_CLASS (0x001E) /* 30 */
|
||||
typedef struct amqp_access_properties_t_ {
|
||||
amqp_flags_t _flags;
|
||||
char dummy; /* Dummy field to avoid empty struct */
|
||||
} amqp_access_properties_t;
|
||||
|
||||
#define AMQP_EXCHANGE_CLASS (0x0028) /* 40 */
|
||||
typedef struct amqp_exchange_properties_t_ {
|
||||
amqp_flags_t _flags;
|
||||
char dummy; /* Dummy field to avoid empty struct */
|
||||
} amqp_exchange_properties_t;
|
||||
|
||||
#define AMQP_QUEUE_CLASS (0x0032) /* 50 */
|
||||
typedef struct amqp_queue_properties_t_ {
|
||||
amqp_flags_t _flags;
|
||||
char dummy; /* Dummy field to avoid empty struct */
|
||||
} amqp_queue_properties_t;
|
||||
|
||||
#define AMQP_BASIC_CLASS (0x003C) /* 60 */
|
||||
#define AMQP_BASIC_CONTENT_TYPE_FLAG (1 << 15)
|
||||
#define AMQP_BASIC_CONTENT_ENCODING_FLAG (1 << 14)
|
||||
#define AMQP_BASIC_HEADERS_FLAG (1 << 13)
|
||||
#define AMQP_BASIC_DELIVERY_MODE_FLAG (1 << 12)
|
||||
#define AMQP_BASIC_PRIORITY_FLAG (1 << 11)
|
||||
#define AMQP_BASIC_CORRELATION_ID_FLAG (1 << 10)
|
||||
#define AMQP_BASIC_REPLY_TO_FLAG (1 << 9)
|
||||
#define AMQP_BASIC_EXPIRATION_FLAG (1 << 8)
|
||||
#define AMQP_BASIC_MESSAGE_ID_FLAG (1 << 7)
|
||||
#define AMQP_BASIC_TIMESTAMP_FLAG (1 << 6)
|
||||
#define AMQP_BASIC_TYPE_FLAG (1 << 5)
|
||||
#define AMQP_BASIC_USER_ID_FLAG (1 << 4)
|
||||
#define AMQP_BASIC_APP_ID_FLAG (1 << 3)
|
||||
#define AMQP_BASIC_CLUSTER_ID_FLAG (1 << 2)
|
||||
typedef struct amqp_basic_properties_t_ {
|
||||
amqp_flags_t _flags;
|
||||
amqp_bytes_t content_type;
|
||||
amqp_bytes_t content_encoding;
|
||||
amqp_table_t headers;
|
||||
uint8_t delivery_mode;
|
||||
uint8_t priority;
|
||||
amqp_bytes_t correlation_id;
|
||||
amqp_bytes_t reply_to;
|
||||
amqp_bytes_t expiration;
|
||||
amqp_bytes_t message_id;
|
||||
uint64_t timestamp;
|
||||
amqp_bytes_t type;
|
||||
amqp_bytes_t user_id;
|
||||
amqp_bytes_t app_id;
|
||||
amqp_bytes_t cluster_id;
|
||||
} amqp_basic_properties_t;
|
||||
|
||||
#define AMQP_TX_CLASS (0x005A) /* 90 */
|
||||
typedef struct amqp_tx_properties_t_ {
|
||||
amqp_flags_t _flags;
|
||||
char dummy; /* Dummy field to avoid empty struct */
|
||||
} amqp_tx_properties_t;
|
||||
|
||||
#define AMQP_CONFIRM_CLASS (0x0055) /* 85 */
|
||||
typedef struct amqp_confirm_properties_t_ {
|
||||
amqp_flags_t _flags;
|
||||
char dummy; /* Dummy field to avoid empty struct */
|
||||
} amqp_confirm_properties_t;
|
||||
|
||||
/* API functions for methods */
|
||||
|
||||
AMQP_PUBLIC_FUNCTION amqp_channel_open_ok_t * AMQP_CALL amqp_channel_open(amqp_connection_state_t state, amqp_channel_t channel);
|
||||
AMQP_PUBLIC_FUNCTION amqp_channel_flow_ok_t * AMQP_CALL amqp_channel_flow(amqp_connection_state_t state, amqp_channel_t channel, amqp_boolean_t active);
|
||||
AMQP_PUBLIC_FUNCTION amqp_exchange_declare_ok_t * AMQP_CALL amqp_exchange_declare(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t exchange, amqp_bytes_t type, amqp_boolean_t passive, amqp_boolean_t durable, amqp_table_t arguments);
|
||||
AMQP_PUBLIC_FUNCTION amqp_exchange_delete_ok_t * AMQP_CALL amqp_exchange_delete(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t exchange, amqp_boolean_t if_unused);
|
||||
AMQP_PUBLIC_FUNCTION amqp_exchange_bind_ok_t * AMQP_CALL amqp_exchange_bind(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t destination, amqp_bytes_t source, amqp_bytes_t routing_key, amqp_table_t arguments);
|
||||
AMQP_PUBLIC_FUNCTION amqp_exchange_unbind_ok_t * AMQP_CALL amqp_exchange_unbind(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t destination, amqp_bytes_t source, amqp_bytes_t routing_key, amqp_table_t arguments);
|
||||
AMQP_PUBLIC_FUNCTION amqp_queue_declare_ok_t * AMQP_CALL amqp_queue_declare(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t queue, amqp_boolean_t passive, amqp_boolean_t durable, amqp_boolean_t exclusive, amqp_boolean_t auto_delete, amqp_table_t arguments);
|
||||
AMQP_PUBLIC_FUNCTION amqp_queue_bind_ok_t * AMQP_CALL amqp_queue_bind(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t queue, amqp_bytes_t exchange, amqp_bytes_t routing_key, amqp_table_t arguments);
|
||||
AMQP_PUBLIC_FUNCTION amqp_queue_purge_ok_t * AMQP_CALL amqp_queue_purge(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t queue);
|
||||
AMQP_PUBLIC_FUNCTION amqp_queue_delete_ok_t * AMQP_CALL amqp_queue_delete(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t queue, amqp_boolean_t if_unused, amqp_boolean_t if_empty);
|
||||
AMQP_PUBLIC_FUNCTION amqp_queue_unbind_ok_t * AMQP_CALL amqp_queue_unbind(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t queue, amqp_bytes_t exchange, amqp_bytes_t routing_key, amqp_table_t arguments);
|
||||
AMQP_PUBLIC_FUNCTION amqp_basic_qos_ok_t * AMQP_CALL amqp_basic_qos(amqp_connection_state_t state, amqp_channel_t channel, uint32_t prefetch_size, uint16_t prefetch_count, amqp_boolean_t global);
|
||||
AMQP_PUBLIC_FUNCTION amqp_basic_consume_ok_t * AMQP_CALL amqp_basic_consume(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t queue, amqp_bytes_t consumer_tag, amqp_boolean_t no_local, amqp_boolean_t no_ack, amqp_boolean_t exclusive, amqp_table_t arguments);
|
||||
AMQP_PUBLIC_FUNCTION amqp_basic_cancel_ok_t * AMQP_CALL amqp_basic_cancel(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t consumer_tag);
|
||||
AMQP_PUBLIC_FUNCTION amqp_basic_recover_ok_t * AMQP_CALL amqp_basic_recover(amqp_connection_state_t state, amqp_channel_t channel, amqp_boolean_t requeue);
|
||||
AMQP_PUBLIC_FUNCTION amqp_tx_select_ok_t * AMQP_CALL amqp_tx_select(amqp_connection_state_t state, amqp_channel_t channel);
|
||||
AMQP_PUBLIC_FUNCTION amqp_tx_commit_ok_t * AMQP_CALL amqp_tx_commit(amqp_connection_state_t state, amqp_channel_t channel);
|
||||
AMQP_PUBLIC_FUNCTION amqp_tx_rollback_ok_t * AMQP_CALL amqp_tx_rollback(amqp_connection_state_t state, amqp_channel_t channel);
|
||||
AMQP_PUBLIC_FUNCTION amqp_confirm_select_ok_t * AMQP_CALL amqp_confirm_select(amqp_connection_state_t state, amqp_channel_t channel);
|
||||
|
||||
AMQP_END_DECLS
|
||||
|
||||
#endif /* AMQP_FRAMING_H */
|
|
@ -0,0 +1,161 @@
|
|||
/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */
|
||||
/** \file */
|
||||
/*
|
||||
* Copyright 2012-2013 Michael Steinert
|
||||
*
|
||||
* Permission is hereby granted, free of charge, to any person obtaining a
|
||||
* copy of this software and associated documentation files (the "Software"),
|
||||
* to deal in the Software without restriction, including without limitation
|
||||
* the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
||||
* and/or sell copies of the Software, and to permit persons to whom the
|
||||
* Software is furnished to do so, subject to the following conditions:
|
||||
*
|
||||
* The above copyright notice and this permission notice shall be included in
|
||||
* all copies or substantial portions of the Software.
|
||||
*
|
||||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||
* DEALINGS IN THE SOFTWARE.
|
||||
*/
|
||||
|
||||
#ifndef AMQP_SSL_H
|
||||
#define AMQP_SSL_H
|
||||
|
||||
#include <amqp.h>
|
||||
|
||||
AMQP_BEGIN_DECLS
|
||||
|
||||
/**
|
||||
* Create a new SSL/TLS socket object.
|
||||
*
|
||||
* The returned socket object is owned by the \ref amqp_connection_state_t object
|
||||
* and will be destroyed when the state object is destroyed or a new socket
|
||||
* object is created.
|
||||
*
|
||||
* If the socket object creation fails, the \ref amqp_connection_state_t object
|
||||
* will not be changed.
|
||||
*
|
||||
* The object returned by this function can be retrieved from the
|
||||
* amqp_connection_state_t object later using the amqp_get_socket() function.
|
||||
*
|
||||
* Calling this function may result in the underlying SSL library being initialized.
|
||||
* \sa amqp_set_initialize_ssl_library()
|
||||
*
|
||||
* \param [in,out] state The connection object that owns the SSL/TLS socket
|
||||
* \return A new socket object or NULL if an error occurred.
|
||||
*
|
||||
* \since v0.4.0
|
||||
*/
|
||||
AMQP_PUBLIC_FUNCTION
|
||||
amqp_socket_t *
|
||||
AMQP_CALL
|
||||
amqp_ssl_socket_new(amqp_connection_state_t state);
|
||||
|
||||
/**
|
||||
* Set the CA certificate.
|
||||
*
|
||||
* \param [in,out] self An SSL/TLS socket object.
|
||||
* \param [in] cacert Path to the CA cert file in PEM format.
|
||||
*
|
||||
* \return \ref AMQP_STATUS_OK on success an enum
|
||||
*
|
||||
* \since v0.4.0
|
||||
*/
|
||||
AMQP_PUBLIC_FUNCTION
|
||||
int
|
||||
AMQP_CALL
|
||||
amqp_ssl_socket_set_cacert(amqp_socket_t *self,
|
||||
const char *cacert);
|
||||
|
||||
/**
|
||||
* Set the client key.
|
||||
*
|
||||
* \param [in,out] self An SSL/TLS socket object.
|
||||
* \param [in] cert Path to the client certificate in PEM foramt.
|
||||
* \param [in] key Path to the client key in PEM format.
|
||||
*
|
||||
* \return Zero if successful, -1 otherwise.
|
||||
*
|
||||
* \since v0.4.0
|
||||
*/
|
||||
AMQP_PUBLIC_FUNCTION
|
||||
int
|
||||
AMQP_CALL
|
||||
amqp_ssl_socket_set_key(amqp_socket_t *self,
|
||||
const char *cert,
|
||||
const char *key);
|
||||
|
||||
/**
|
||||
* Set the client key from a buffer.
|
||||
*
|
||||
* \param [in,out] self An SSL/TLS socket object.
|
||||
* \param [in] cert Path to the client certificate in PEM foramt.
|
||||
* \param [in] key A buffer containing client key in PEM format.
|
||||
* \param [in] n The length of the buffer.
|
||||
*
|
||||
* \return Zero if successful, -1 otherwise.
|
||||
*
|
||||
* \since v0.4.0
|
||||
*/
|
||||
AMQP_PUBLIC_FUNCTION
|
||||
int
|
||||
AMQP_CALL
|
||||
amqp_ssl_socket_set_key_buffer(amqp_socket_t *self,
|
||||
const char *cert,
|
||||
const void *key,
|
||||
size_t n);
|
||||
|
||||
/**
|
||||
* Enable or disable peer verification.
|
||||
*
|
||||
* If peer verification is enabled then the common name in the server
|
||||
* certificate must match the server name. Peer verification is enabled by
|
||||
* default.
|
||||
*
|
||||
* \param [in,out] self An SSL/TLS socket object.
|
||||
* \param [in] verify Enable or disable peer verification.
|
||||
*
|
||||
* \since v0.4.0
|
||||
*/
|
||||
AMQP_PUBLIC_FUNCTION
|
||||
void
|
||||
AMQP_CALL
|
||||
amqp_ssl_socket_set_verify(amqp_socket_t *self,
|
||||
amqp_boolean_t verify);
|
||||
|
||||
/**
|
||||
* Sets whether rabbitmq-c initializes the underlying SSL library.
|
||||
*
|
||||
* For SSL libraries that require a one-time initialization across
|
||||
* a whole program (e.g., OpenSSL) this sets whether or not rabbitmq-c
|
||||
* will initialize the SSL library when the first call to
|
||||
* amqp_open_socket() is made. You should call this function with
|
||||
* do_init = 0 if the underlying SSL library is initialized somewhere else
|
||||
* the program.
|
||||
*
|
||||
* Failing to initialize or double initialization of the SSL library will
|
||||
* result in undefined behavior
|
||||
*
|
||||
* By default rabbitmq-c will initialize the underlying SSL library
|
||||
*
|
||||
* NOTE: calling this function after the first socket has been opened with
|
||||
* amqp_open_socket() will not have any effect.
|
||||
*
|
||||
* \param [in] do_initalize If 0 rabbitmq-c will not initialize the SSL
|
||||
* library, otherwise rabbitmq-c will initialize the
|
||||
* SSL library
|
||||
*
|
||||
* \since v0.4.0
|
||||
*/
|
||||
AMQP_PUBLIC_FUNCTION
|
||||
void
|
||||
AMQP_CALL
|
||||
amqp_set_initialize_ssl_library(amqp_boolean_t do_initialize);
|
||||
|
||||
AMQP_END_DECLS
|
||||
|
||||
#endif /* AMQP_SSL_H */
|
|
@ -0,0 +1,69 @@
|
|||
/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */
|
||||
/** \file */
|
||||
/*
|
||||
* Copyright 2012-2013 Michael Steinert
|
||||
*
|
||||
* Permission is hereby granted, free of charge, to any person obtaining a
|
||||
* copy of this software and associated documentation files (the "Software"),
|
||||
* to deal in the Software without restriction, including without limitation
|
||||
* the rights to use, copy, modify, merge, publish, distribute, sublicense,
|
||||
* and/or sell copies of the Software, and to permit persons to whom the
|
||||
* Software is furnished to do so, subject to the following conditions:
|
||||
*
|
||||
* The above copyright notice and this permission notice shall be included in
|
||||
* all copies or substantial portions of the Software.
|
||||
*
|
||||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
|
||||
* DEALINGS IN THE SOFTWARE.
|
||||
*/
|
||||
|
||||
/**
|
||||
* A TCP socket connection.
|
||||
*/
|
||||
|
||||
#ifndef AMQP_TCP_SOCKET_H
|
||||
#define AMQP_TCP_SOCKET_H
|
||||
|
||||
#include <amqp.h>
|
||||
|
||||
AMQP_BEGIN_DECLS
|
||||
|
||||
/**
|
||||
* Create a new TCP socket.
|
||||
*
|
||||
* Call amqp_socket_close() to release socket resources.
|
||||
*
|
||||
* \return A new socket object or NULL if an error occurred.
|
||||
*
|
||||
* \since v0.4.0
|
||||
*/
|
||||
AMQP_PUBLIC_FUNCTION
|
||||
amqp_socket_t *
|
||||
AMQP_CALL
|
||||
amqp_tcp_socket_new(amqp_connection_state_t state);
|
||||
|
||||
/**
|
||||
* Assign an open file descriptor to a socket object.
|
||||
*
|
||||
* This function must not be used in conjunction with amqp_socket_open(), i.e.
|
||||
* the socket connection should already be open(2) when this function is
|
||||
* called.
|
||||
*
|
||||
* \param [in,out] self A TCP socket object.
|
||||
* \param [in] sockfd An open socket descriptor.
|
||||
*
|
||||
* \since v0.4.0
|
||||
*/
|
||||
AMQP_PUBLIC_FUNCTION
|
||||
void
|
||||
AMQP_CALL
|
||||
amqp_tcp_socket_set_sockfd(amqp_socket_t *base, int sockfd);
|
||||
|
||||
AMQP_END_DECLS
|
||||
|
||||
#endif /* AMQP_TCP_SOCKET_H */
|
File diff suppressed because it is too large
Load Diff
|
@ -0,0 +1,146 @@
|
|||
/* This file is an autogenerated single-file version of liblcfg.
|
||||
* It is recommended that you update this file on a regular
|
||||
* basis from the original liblcfg distribution package.
|
||||
*
|
||||
* The most recent version of liblcfg is available at
|
||||
* <http://liblcfg.carnivore.it>
|
||||
*/
|
||||
/*
|
||||
Copyright (c) 2012, Paul Baecher
|
||||
All rights reserved.
|
||||
|
||||
Redistribution and use in source and binary forms, with or without
|
||||
modification, are permitted provided that the following conditions are met:
|
||||
* Redistributions of source code must retain the above copyright
|
||||
notice, this list of conditions and the following disclaimer.
|
||||
* Redistributions in binary form must reproduce the above copyright
|
||||
notice, this list of conditions and the following disclaimer in the
|
||||
documentation and/or other materials provided with the distribution.
|
||||
* Neither the name of the <organization> nor the
|
||||
names of its contributors may be used to endorse or promote products
|
||||
derived from this software without specific prior written permission.
|
||||
|
||||
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
|
||||
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
|
||||
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
|
||||
DISCLAIMED. IN NO EVENT SHALL <THE COPYRIGHT HOLDER> BE LIABLE FOR ANY
|
||||
DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
|
||||
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
|
||||
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
|
||||
ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
||||
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
|
||||
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
*/
|
||||
#ifndef LCFG_H
|
||||
#define LCFG_H
|
||||
|
||||
#include <stdlib.h>
|
||||
|
||||
struct lcfg;
|
||||
|
||||
enum lcfg_status { lcfg_status_ok, lcfg_status_error };
|
||||
|
||||
typedef enum lcfg_status (*lcfg_visitor_function)(const char *key, void *data, size_t size, void *user_data);
|
||||
|
||||
|
||||
/* open a new config file */
|
||||
struct lcfg * lcfg_new(const char *filename);
|
||||
|
||||
/* parse config into memory */
|
||||
enum lcfg_status lcfg_parse(struct lcfg *);
|
||||
|
||||
/* visit all configuration elements */
|
||||
enum lcfg_status lcfg_accept(struct lcfg *, lcfg_visitor_function, void *);
|
||||
|
||||
/* access a value by path */
|
||||
enum lcfg_status lcfg_value_get(struct lcfg *, const char *, void **, size_t *);
|
||||
|
||||
/* return the last error message */
|
||||
const char * lcfg_error_get(struct lcfg *);
|
||||
|
||||
/* set error */
|
||||
void lcfg_error_set(struct lcfg *, const char *fmt, ...);
|
||||
|
||||
/* destroy lcfg context */
|
||||
void lcfg_delete(struct lcfg *);
|
||||
|
||||
|
||||
#endif
|
||||
/*
|
||||
Copyright (c) 2012, Paul Baecher
|
||||
All rights reserved.
|
||||
|
||||
Redistribution and use in source and binary forms, with or without
|
||||
modification, are permitted provided that the following conditions are met:
|
||||
* Redistributions of source code must retain the above copyright
|
||||
notice, this list of conditions and the following disclaimer.
|
||||
* Redistributions in binary form must reproduce the above copyright
|
||||
notice, this list of conditions and the following disclaimer in the
|
||||
documentation and/or other materials provided with the distribution.
|
||||
* Neither the name of the <organization> nor the
|
||||
names of its contributors may be used to endorse or promote products
|
||||
derived from this software without specific prior written permission.
|
||||
|
||||
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
|
||||
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
|
||||
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
|
||||
DISCLAIMED. IN NO EVENT SHALL <THE COPYRIGHT HOLDER> BE LIABLE FOR ANY
|
||||
DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
|
||||
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
|
||||
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
|
||||
ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
||||
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
|
||||
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
*/
|
||||
#ifndef LCFGX_TREE_H
|
||||
#define LCFGX_TREE_H
|
||||
|
||||
|
||||
enum lcfgx_type
|
||||
{
|
||||
lcfgx_string,
|
||||
lcfgx_list,
|
||||
lcfgx_map,
|
||||
};
|
||||
|
||||
struct lcfgx_tree_node
|
||||
{
|
||||
enum lcfgx_type type;
|
||||
char *key; /* NULL for root node */
|
||||
|
||||
union
|
||||
{
|
||||
struct
|
||||
{
|
||||
void *data;
|
||||
size_t len;
|
||||
} string;
|
||||
struct lcfgx_tree_node *elements; /* in case of list or map type */
|
||||
} value;
|
||||
|
||||
struct lcfgx_tree_node *next;
|
||||
};
|
||||
|
||||
struct lcfgx_tree_node *lcfgx_tree_new(struct lcfg *);
|
||||
|
||||
void lcfgx_tree_delete(struct lcfgx_tree_node *);
|
||||
void lcfgx_tree_dump(struct lcfgx_tree_node *node, int depth);
|
||||
|
||||
enum lcfgx_path_access
|
||||
{
|
||||
LCFGX_PATH_NOT_FOUND,
|
||||
LCFGX_PATH_FOUND_WRONG_TYPE_BAD,
|
||||
LCFGX_PATH_FOUND_TYPE_OK,
|
||||
};
|
||||
|
||||
extern const char *lcfgx_path_access_strings[];
|
||||
|
||||
|
||||
enum lcfgx_path_access lcfgx_get(struct lcfgx_tree_node *root, struct lcfgx_tree_node **n, const char *key, enum lcfgx_type type);
|
||||
enum lcfgx_path_access lcfgx_get_list(struct lcfgx_tree_node *root, struct lcfgx_tree_node **n, const char *key);
|
||||
enum lcfgx_path_access lcfgx_get_map(struct lcfgx_tree_node *root, struct lcfgx_tree_node **n, const char *key);
|
||||
enum lcfgx_path_access lcfgx_get_string(struct lcfgx_tree_node *root, struct lcfgx_tree_node **n, const char *key);
|
||||
|
||||
|
||||
#endif
|
||||
|
Binary file not shown.
|
@ -0,0 +1,239 @@
|
|||
#include <stdlib.h>
|
||||
#include <stdio.h>
|
||||
#include <unistd.h>
|
||||
#include <string.h>
|
||||
#include <stdint.h>
|
||||
#include <assert.h>
|
||||
#include <fcntl.h>
|
||||
#include <sys/stat.h>
|
||||
#include <sys/types.h>
|
||||
|
||||
#include <amqp_tcp_socket.h>
|
||||
#include <amqp.h>
|
||||
#include <amqp_framing.h>
|
||||
|
||||
#include "utils.h"
|
||||
#include "lcfg_static.h"
|
||||
|
||||
|
||||
static void run ( amqp_connection_state_t conn, int log_fd , const char *result_routing_key )
|
||||
{
|
||||
int received = 0;
|
||||
amqp_frame_t frame;
|
||||
|
||||
while ( 1 )
|
||||
{
|
||||
amqp_rpc_reply_t ret;
|
||||
amqp_envelope_t envelope;
|
||||
|
||||
amqp_maybe_release_buffers ( conn );
|
||||
ret = amqp_consume_message ( conn, &envelope, NULL, 0 );
|
||||
|
||||
if ( AMQP_RESPONSE_NORMAL == ret.reply_type )
|
||||
{
|
||||
int i;
|
||||
amqp_bytes_t body = envelope.message.body;
|
||||
const char *title = "A new message received:\n";
|
||||
|
||||
fprintf ( stdout, title, received );
|
||||
for ( i = 0; i < body.len; i++ )
|
||||
{
|
||||
fprintf ( stdout, "%c", * ( char* ) ( body.bytes + i ) );
|
||||
}
|
||||
puts ( "\n" );
|
||||
|
||||
write ( log_fd, ( void * ) title, strlen ( title ) );
|
||||
write ( log_fd, body.bytes, body.len );
|
||||
write ( log_fd, ( void * ) "\n\n", 2 );
|
||||
|
||||
/* Send a reply. */
|
||||
amqp_basic_properties_t props;
|
||||
props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG | AMQP_BASIC_MESSAGE_ID_FLAG;
|
||||
|
||||
printf("message id: %s", (const char*)envelope.message.properties.message_id.bytes);
|
||||
|
||||
props.message_id = amqp_bytes_malloc_dup ( envelope.message.properties.message_id );
|
||||
props.content_type = amqp_cstring_bytes ( "text/json" );
|
||||
props.delivery_mode = 2; /* persistent delivery mode */
|
||||
|
||||
const char *result_body = "{\"IsException\": false, \"Result\": [{\"IsException\": false, \"Result\": []}]}";
|
||||
|
||||
die_on_error ( amqp_basic_publish ( conn,
|
||||
1,
|
||||
amqp_cstring_bytes ( "" ),
|
||||
amqp_cstring_bytes ( result_routing_key ),
|
||||
0,
|
||||
0,
|
||||
&props,
|
||||
amqp_cstring_bytes ( result_body ) ),
|
||||
"Publishing" );
|
||||
|
||||
amqp_destroy_envelope ( &envelope );
|
||||
}
|
||||
else
|
||||
{
|
||||
if ( AMQP_RESPONSE_LIBRARY_EXCEPTION == ret.reply_type &&
|
||||
AMQP_STATUS_UNEXPECTED_STATE == ret.library_error )
|
||||
{
|
||||
if ( AMQP_STATUS_OK != amqp_simple_wait_frame ( conn, &frame ) )
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
if ( AMQP_FRAME_METHOD == frame.frame_type )
|
||||
{
|
||||
switch ( frame.payload.method.id )
|
||||
{
|
||||
case AMQP_BASIC_ACK_METHOD:
|
||||
/* if we've turned publisher confirms on, and we've published a message
|
||||
* here is a message being confirmed
|
||||
*/
|
||||
|
||||
break;
|
||||
case AMQP_BASIC_RETURN_METHOD:
|
||||
/* if a published message couldn't be routed and the mandatory flag was set
|
||||
* this is what would be returned. The message then needs to be read.
|
||||
*/
|
||||
{
|
||||
amqp_message_t message;
|
||||
ret = amqp_read_message ( conn, frame.channel, &message, 0 );
|
||||
|
||||
if ( AMQP_RESPONSE_NORMAL != ret.reply_type )
|
||||
{
|
||||
return;
|
||||
}
|
||||
|
||||
amqp_destroy_message ( &message );
|
||||
}
|
||||
|
||||
break;
|
||||
|
||||
case AMQP_CHANNEL_CLOSE_METHOD:
|
||||
/* a channel.close method happens when a channel exception occurs, this
|
||||
* can happen by publishing to an exchange that doesn't exist for example
|
||||
*
|
||||
* In this case you would need to open another channel redeclare any queues
|
||||
* that were declared auto-delete, and restart any consumers that were attached
|
||||
* to the previous channel
|
||||
*/
|
||||
return;
|
||||
|
||||
case AMQP_CONNECTION_CLOSE_METHOD:
|
||||
/* a connection.close method happens when a connection exception occurs,
|
||||
* this can happen by trying to use a channel that isn't open for example.
|
||||
*
|
||||
* In this case the whole connection must be restarted.
|
||||
*/
|
||||
return;
|
||||
|
||||
default:
|
||||
fprintf ( stderr ,"An unexpected method was received %d\n", frame.payload.method.id );
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
received++;
|
||||
}
|
||||
}
|
||||
|
||||
static const char* get_config_value ( struct lcfg *cfg, const char *key, int verbose )
|
||||
{
|
||||
void *data;
|
||||
size_t len;
|
||||
|
||||
if ( lcfg_value_get ( cfg, key, &data, &len ) != lcfg_status_ok )
|
||||
{
|
||||
fprintf ( stderr, "Key %s is not found in the configuration file", key );
|
||||
}
|
||||
|
||||
const char *val = ( const char * ) data;
|
||||
|
||||
if ( verbose )
|
||||
{
|
||||
fprintf ( stdout, "%s = %s\n", key, val );
|
||||
}
|
||||
|
||||
return val;
|
||||
}
|
||||
|
||||
int main ( int argc, char const *const *argv )
|
||||
{
|
||||
if ( argc != 3 )
|
||||
{
|
||||
printf ( "usage: %s CFG_FILE LOG_FILE\n", argv[0] );
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
const char *log_filename = argv[2];
|
||||
int flags = O_CREAT | O_APPEND | O_RDWR;
|
||||
mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP;
|
||||
int log_fd = open ( log_filename, flags, mode );
|
||||
|
||||
if ( log_fd < 0 )
|
||||
{
|
||||
fprintf ( stderr, "ERROR: Falied to open the log file '%s'\n", log_filename );
|
||||
|
||||
exit ( 1 );
|
||||
}
|
||||
|
||||
/* Read the configuration file. */
|
||||
struct lcfg *cfg = lcfg_new ( argv[1] );
|
||||
|
||||
if ( lcfg_parse ( cfg ) != lcfg_status_ok )
|
||||
{
|
||||
printf ( "lcfg error: %s\n", lcfg_error_get ( cfg ) );
|
||||
|
||||
return -1;
|
||||
}
|
||||
|
||||
/* Read all the configuration parameters. */
|
||||
fprintf ( stdout, "Starting Murano agent with the following configuration:\n\n" );
|
||||
|
||||
const char *host = get_config_value ( cfg, "RABBITMQ_HOST" , 1 );
|
||||
int port = atoi ( get_config_value ( cfg, "RABBITMQ_PORT" , 1 ) );
|
||||
const char *vhost = get_config_value ( cfg, "RABBITMQ_VHOST" , 1 );
|
||||
const char *username = get_config_value ( cfg, "RABBITMQ_USERNAME" , 1 );
|
||||
const char *password = get_config_value ( cfg, "RABBITMQ_PASSWORD" , 1 );
|
||||
const char *queuename = get_config_value ( cfg, "RABBITMQ_INPUT_QUEUE" , 1 );
|
||||
const char *result_routing_key = get_config_value ( cfg, "RABBITMQ_RESULT_ROUTING_KEY", 1 );
|
||||
|
||||
amqp_connection_state_t conn = amqp_new_connection();
|
||||
amqp_socket_t *socket = NULL;
|
||||
amqp_bytes_t queuename_bytes = amqp_cstring_bytes ( queuename );
|
||||
|
||||
socket = amqp_tcp_socket_new ( conn );
|
||||
if ( !socket )
|
||||
{
|
||||
die ( "creating TCP socket" );
|
||||
}
|
||||
|
||||
if ( amqp_socket_open ( socket, host, port ) )
|
||||
{
|
||||
die ( "opening TCP socket" );
|
||||
}
|
||||
|
||||
die_on_amqp_error ( amqp_login ( conn, vhost, 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, username, password ),
|
||||
"Logging in" );
|
||||
amqp_channel_open ( conn, 1 );
|
||||
die_on_amqp_error ( amqp_get_rpc_reply ( conn ), "Opening channel" );
|
||||
|
||||
amqp_basic_consume ( conn, 1, queuename_bytes, amqp_empty_bytes, 0, 1, 0, amqp_empty_table );
|
||||
die_on_amqp_error ( amqp_get_rpc_reply ( conn ), "Consuming" );
|
||||
|
||||
puts ( "\nSuccessfully connected to Rabbit MQ server! Ready for messages..." );
|
||||
|
||||
run ( conn, log_fd , result_routing_key );
|
||||
|
||||
close ( log_fd );
|
||||
lcfg_delete ( cfg );
|
||||
|
||||
die_on_amqp_error ( amqp_channel_close ( conn, 1, AMQP_REPLY_SUCCESS ), "Closing channel" );
|
||||
die_on_amqp_error ( amqp_connection_close ( conn, AMQP_REPLY_SUCCESS ), "Closing connection" );
|
||||
die_on_error ( amqp_destroy_connection ( conn ), "Ending connection" );
|
||||
|
||||
return 0;
|
||||
}
|
|
@ -0,0 +1,7 @@
|
|||
RABBITMQ_HOST = "localhost"
|
||||
RABBITMQ_PORT = "5672"
|
||||
RABBITMQ_USERNAME = "guest"
|
||||
RABBITMQ_PASSWORD = "guest"
|
||||
RABBITMQ_VHOST = "/"
|
||||
RABBITMQ_INPUT_QUEUE = "test queue"
|
||||
RABBITMQ_RESULT_ROUTING_KEY = "result queue"
|
|
@ -0,0 +1,128 @@
|
|||
|
||||
#include <stdlib.h>
|
||||
#include <stdio.h>
|
||||
#include <string.h>
|
||||
|
||||
#include <stdint.h>
|
||||
#include <amqp_tcp_socket.h>
|
||||
#include <amqp.h>
|
||||
#include <amqp_framing.h>
|
||||
|
||||
#include "utils.h"
|
||||
|
||||
#define SUMMARY_EVERY_US 1000000
|
||||
|
||||
static void send_batch ( amqp_connection_state_t conn,
|
||||
char const *queue_name,
|
||||
int rate_limit,
|
||||
int message_count )
|
||||
{
|
||||
uint64_t start_time = now_microseconds();
|
||||
int i;
|
||||
int sent = 0;
|
||||
int previous_sent = 0;
|
||||
uint64_t previous_report_time = start_time;
|
||||
uint64_t next_summary_time = start_time + SUMMARY_EVERY_US;
|
||||
|
||||
const char *message = "Hello from the producer!";
|
||||
amqp_bytes_t message_bytes;
|
||||
|
||||
message_bytes.len = strlen ( message );
|
||||
message_bytes.bytes = ( void * ) message;
|
||||
|
||||
for ( i = 0; i < message_count; i++ )
|
||||
{
|
||||
uint64_t now = now_microseconds();
|
||||
|
||||
amqp_basic_properties_t props;
|
||||
props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG | AMQP_BASIC_MESSAGE_ID_FLAG;
|
||||
props.message_id = amqp_cstring_bytes( " msg_id " );
|
||||
props.content_type = amqp_cstring_bytes ( "text/json" );
|
||||
props.delivery_mode = 2; /* persistent delivery mode */
|
||||
|
||||
die_on_error ( amqp_basic_publish ( conn,
|
||||
1,
|
||||
amqp_cstring_bytes ( "" ),
|
||||
amqp_cstring_bytes ( queue_name ),
|
||||
0,
|
||||
0,
|
||||
&props,
|
||||
message_bytes ),
|
||||
"Publishing" );
|
||||
sent++;
|
||||
if ( now > next_summary_time )
|
||||
{
|
||||
int countOverInterval = sent - previous_sent;
|
||||
double intervalRate = countOverInterval / ( ( now - previous_report_time ) / 1000000.0 );
|
||||
printf ( "%d ms: Sent %d - %d since last report (%d Hz)\n",
|
||||
( int ) ( now - start_time ) / 1000, sent, countOverInterval, ( int ) intervalRate );
|
||||
|
||||
previous_sent = sent;
|
||||
previous_report_time = now;
|
||||
next_summary_time += SUMMARY_EVERY_US;
|
||||
}
|
||||
|
||||
while ( ( ( i * 1000000.0 ) / ( now - start_time ) ) > rate_limit )
|
||||
{
|
||||
microsleep ( 2000 );
|
||||
now = now_microseconds();
|
||||
}
|
||||
}
|
||||
|
||||
{
|
||||
uint64_t stop_time = now_microseconds();
|
||||
int total_delta = stop_time - start_time;
|
||||
|
||||
printf ( "PRODUCER - Message count: %d\n", message_count );
|
||||
printf ( "Total time, milliseconds: %d\n", total_delta / 1000 );
|
||||
printf ( "Overall messages-per-second: %g\n", ( message_count / ( total_delta / 1000000.0 ) ) );
|
||||
}
|
||||
}
|
||||
|
||||
int main ( int argc, char const *const *argv )
|
||||
{
|
||||
char const *hostname;
|
||||
int port, status;
|
||||
int rate_limit;
|
||||
int message_count;
|
||||
amqp_socket_t *socket = NULL;
|
||||
amqp_connection_state_t conn;
|
||||
|
||||
if ( argc < 5 )
|
||||
{
|
||||
fprintf ( stderr, "Usage: producer host port rate_limit message_count\n" );
|
||||
return 1;
|
||||
}
|
||||
|
||||
hostname = argv[1];
|
||||
port = atoi ( argv[2] );
|
||||
rate_limit = atoi ( argv[3] );
|
||||
message_count = atoi ( argv[4] );
|
||||
|
||||
conn = amqp_new_connection();
|
||||
|
||||
socket = amqp_tcp_socket_new ( conn );
|
||||
if ( !socket )
|
||||
{
|
||||
die ( "creating TCP socket" );
|
||||
}
|
||||
|
||||
status = amqp_socket_open ( socket, hostname, port );
|
||||
if ( status )
|
||||
{
|
||||
die ( "opening TCP socket" );
|
||||
}
|
||||
|
||||
die_on_amqp_error ( amqp_login ( conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest" ),
|
||||
"Logging in" );
|
||||
amqp_channel_open ( conn, 1 );
|
||||
die_on_amqp_error ( amqp_get_rpc_reply ( conn ), "Opening channel" );
|
||||
|
||||
send_batch ( conn, "test queue", rate_limit, message_count );
|
||||
|
||||
die_on_amqp_error ( amqp_channel_close ( conn, 1, AMQP_REPLY_SUCCESS ), "Closing channel" );
|
||||
die_on_amqp_error ( amqp_connection_close ( conn, AMQP_REPLY_SUCCESS ), "Closing connection" );
|
||||
die_on_error ( amqp_destroy_connection ( conn ), "Ending connection" );
|
||||
|
||||
return 0;
|
||||
}
|
|
@ -0,0 +1,201 @@
|
|||
#include <stdarg.h>
|
||||
#include <stdlib.h>
|
||||
#include <stdio.h>
|
||||
#include <string.h>
|
||||
#include <ctype.h>
|
||||
|
||||
#include <stdint.h>
|
||||
#include <amqp.h>
|
||||
#include <amqp_framing.h>
|
||||
|
||||
#include "utils.h"
|
||||
|
||||
/* For usleep */
|
||||
/* #define _BSD_SOURCE */
|
||||
|
||||
#include <stdint.h>
|
||||
|
||||
#include <sys/time.h>
|
||||
#include <unistd.h>
|
||||
|
||||
uint64_t now_microseconds ( void )
|
||||
{
|
||||
struct timeval tv;
|
||||
gettimeofday ( &tv, NULL );
|
||||
return ( uint64_t ) tv.tv_sec * 1000000 + ( uint64_t ) tv.tv_usec;
|
||||
}
|
||||
|
||||
void microsleep ( int usec )
|
||||
{
|
||||
usleep ( usec );
|
||||
}
|
||||
|
||||
void die ( const char *fmt, ... )
|
||||
{
|
||||
va_list ap;
|
||||
va_start ( ap, fmt );
|
||||
vfprintf ( stderr, fmt, ap );
|
||||
va_end ( ap );
|
||||
fprintf ( stderr, "\n" );
|
||||
exit ( 1 );
|
||||
}
|
||||
|
||||
void die_on_error ( int x, char const *context )
|
||||
{
|
||||
if ( x < 0 )
|
||||
{
|
||||
fprintf ( stderr, "%s: %s\n", context, amqp_error_string2 ( x ) );
|
||||
exit ( 1 );
|
||||
}
|
||||
}
|
||||
|
||||
void die_on_amqp_error ( amqp_rpc_reply_t x, char const *context )
|
||||
{
|
||||
switch ( x.reply_type )
|
||||
{
|
||||
case AMQP_RESPONSE_NORMAL:
|
||||
return;
|
||||
|
||||
case AMQP_RESPONSE_NONE:
|
||||
fprintf ( stderr, "%s: missing RPC reply type!\n", context );
|
||||
break;
|
||||
|
||||
case AMQP_RESPONSE_LIBRARY_EXCEPTION:
|
||||
fprintf ( stderr, "%s: %s\n", context, amqp_error_string2 ( x.library_error ) );
|
||||
break;
|
||||
|
||||
case AMQP_RESPONSE_SERVER_EXCEPTION:
|
||||
switch ( x.reply.id )
|
||||
{
|
||||
case AMQP_CONNECTION_CLOSE_METHOD:
|
||||
{
|
||||
amqp_connection_close_t *m = ( amqp_connection_close_t * ) x.reply.decoded;
|
||||
fprintf ( stderr, "%s: server connection error %d, message: %.*s\n",
|
||||
context,
|
||||
m->reply_code,
|
||||
( int ) m->reply_text.len, ( char * ) m->reply_text.bytes );
|
||||
break;
|
||||
}
|
||||
case AMQP_CHANNEL_CLOSE_METHOD:
|
||||
{
|
||||
amqp_channel_close_t *m = ( amqp_channel_close_t * ) x.reply.decoded;
|
||||
fprintf ( stderr, "%s: server channel error %d, message: %.*s\n",
|
||||
context,
|
||||
m->reply_code,
|
||||
( int ) m->reply_text.len, ( char * ) m->reply_text.bytes );
|
||||
break;
|
||||
}
|
||||
default:
|
||||
fprintf ( stderr, "%s: unknown server error, method id 0x%08X\n", context, x.reply.id );
|
||||
break;
|
||||
}
|
||||
break;
|
||||
}
|
||||
|
||||
exit ( 1 );
|
||||
}
|
||||
|
||||
static void dump_row ( long count, int numinrow, int *chs )
|
||||
{
|
||||
int i;
|
||||
|
||||
printf ( "%08lX:", count - numinrow );
|
||||
|
||||
if ( numinrow > 0 )
|
||||
{
|
||||
for ( i = 0; i < numinrow; i++ )
|
||||
{
|
||||
if ( i == 8 )
|
||||
{
|
||||
printf ( " :" );
|
||||
}
|
||||
printf ( " %02X", chs[i] );
|
||||
}
|
||||
for ( i = numinrow; i < 16; i++ )
|
||||
{
|
||||
if ( i == 8 )
|
||||
{
|
||||
printf ( " :" );
|
||||
}
|
||||
printf ( " " );
|
||||
}
|
||||
printf ( " " );
|
||||
for ( i = 0; i < numinrow; i++ )
|
||||
{
|
||||
if ( isprint ( chs[i] ) )
|
||||
{
|
||||
printf ( "%c", chs[i] );
|
||||
}
|
||||
else
|
||||
{
|
||||
printf ( "." );
|
||||
}
|
||||
}
|
||||
}
|
||||
printf ( "\n" );
|
||||
}
|
||||
|
||||
static int rows_eq ( int *a, int *b )
|
||||
{
|
||||
int i;
|
||||
|
||||
for ( i=0; i<16; i++ )
|
||||
if ( a[i] != b[i] )
|
||||
{
|
||||
return 0;
|
||||
}
|
||||
|
||||
return 1;
|
||||
}
|
||||
|
||||
void amqp_dump ( void const *buffer, size_t len )
|
||||
{
|
||||
unsigned char *buf = ( unsigned char * ) buffer;
|
||||
long count = 0;
|
||||
int numinrow = 0;
|
||||
int chs[16];
|
||||
int oldchs[16] = {0};
|
||||
int showed_dots = 0;
|
||||
size_t i;
|
||||
|
||||
for ( i = 0; i < len; i++ )
|
||||
{
|
||||
int ch = buf[i];
|
||||
|
||||
if ( numinrow == 16 )
|
||||
{
|
||||
int i;
|
||||
|
||||
if ( rows_eq ( oldchs, chs ) )
|
||||
{
|
||||
if ( !showed_dots )
|
||||
{
|
||||
showed_dots = 1;
|
||||
printf ( " .. .. .. .. .. .. .. .. : .. .. .. .. .. .. .. ..\n" );
|
||||
}
|
||||
}
|
||||
else
|
||||
{
|
||||
showed_dots = 0;
|
||||
dump_row ( count, numinrow, chs );
|
||||
}
|
||||
|
||||
for ( i=0; i<16; i++ )
|
||||
{
|
||||
oldchs[i] = chs[i];
|
||||
}
|
||||
|
||||
numinrow = 0;
|
||||
}
|
||||
|
||||
count++;
|
||||
chs[numinrow++] = ch;
|
||||
}
|
||||
|
||||
dump_row ( count, numinrow, chs );
|
||||
|
||||
if ( numinrow != 0 )
|
||||
{
|
||||
printf ( "%08lX:\n", count );
|
||||
}
|
||||
}
|
|
@ -0,0 +1,13 @@
|
|||
#ifndef agent_utils_h
|
||||
#define agent_utils_h
|
||||
|
||||
void die(const char *fmt, ...);
|
||||
extern void die_on_error(int x, char const *context);
|
||||
extern void die_on_amqp_error(amqp_rpc_reply_t x, char const *context);
|
||||
|
||||
extern void amqp_dump(void const *buffer, size_t len);
|
||||
|
||||
extern uint64_t now_microseconds(void);
|
||||
extern void microsleep(int usec);
|
||||
|
||||
#endif
|
Loading…
Reference in New Issue