Merge "Add C demo agent"

This commit is contained in:
Jenkins 2013-10-21 14:09:45 +00:00 committed by Gerrit Code Review
commit bb89406f12
14 changed files with 5639 additions and 0 deletions

38
c-demo-agent/Makefile Normal file
View File

@ -0,0 +1,38 @@
# Makefile for murano agent
CFLAGS= -Wall
default_target: all
all: agent-binary producer-binary
# NOTE: Use this if RabbitMQ needs to be linked in as a dynamic library
#agent-binary: murano-agent.o lcfg_static.o utils.o
# gcc -o murano-agent murano-agent.o lcfg_static.o utils.o -lrabbitmq -lrt
agent-binary: murano-agent.o lcfg_static.o utils.o librabbitmq.a
gcc $(CFLAGS) -o murano-agent murano-agent.o lcfg_static.o utils.o -L. -lrabbitmq -lrt
# NOTE: Use this if RabbitMQ needs to be linked in as a dynamic library
#producer-binary: producer.o utils.o
# gcc -o producer producer.o utils.o -lrabbitmq -lrt
producer-binary: producer.o utils.o librabbitmq.a
gcc $(CFLAGS) -o producer producer.o utils.o -L. -lrabbitmq -lrt
murano-agent.o: murano-agent.c
gcc $(CFLAGS) -c -I. -fPIC murano-agent.c
producer.o: producer.c
gcc $(CFLAGS) -c -I. -fPIC producer.c
lcfg_static.o: lcfg_static.c
gcc $(CFLAGS) -c -fPIC lcfg_static.c
utils.o: utils.c
gcc $(CFLAGS) -c -I. -fPIC utils.c
clean:
rm -f *.o murano-agent producer

64
c-demo-agent/Readme Normal file
View File

@ -0,0 +1,64 @@
This is a demo implementation of Murano agent.
=== The scenario ===
Effectively the agent only connects to an AMQP queue
using properties from the configuration file, consumes
all the messages from the queue, logs them and puts
result messages into a result queue. Result messages
are just stubs signaling success back to the calling
application.
=== Building binaries ===
Build process has been tested on Ubuntu 13.04 only for
x86_64 architecture. Note that the distribution contains
RabbitMQ client pre-built as a static library. It gets
linked with demo agent object files statically. It was
done to eliminate the need to build and install RabbitMQ
client library separately in case of using Ubuntu 64 bit.
In order to build demo agent for embedded operating systems
based on BusyBox technology (such as Cirros) refer to
Buildroot toolkit (http://buildroot.uclibc.org/).
To build agent binary:
make agent-binary
To build test producer binary:
make prodcer-binary
To build both binaries:
make
To clean up the source directory:
make clean
=== Usage ===
To run agent:
./murano-agent <config_file> <log_file>
<config_file>:
Refer to murano-agent.conf.example file to find out
what configuration properties are possible.
<log_file>:
Path to log file which agent writes the messages into.
To run test message producer:
./producer <host> <port> <message_rate> <message_count>
<host>:
RabbitMQ server host for the producer to connect to.
<port>:
RabbitMQ server port for the producer to connect to.
<message_rate>:
Message rate at which the producer sends messages
(delay in seconds between messages).
<message_count>:
A number of messages the producer should send.

2302
c-demo-agent/amqp.h Normal file

File diff suppressed because it is too large Load Diff

630
c-demo-agent/amqp_framing.h Normal file
View File

@ -0,0 +1,630 @@
/* Generated code. Do not edit. Edit and re-run codegen.py instead.
*
* ***** BEGIN LICENSE BLOCK *****
* Version: MIT
*
* Portions created by Alan Antonuk are Copyright (c) 2012-2013
* Alan Antonuk. All Rights Reserved.
*
* Portions created by VMware are Copyright (c) 2007-2012 VMware, Inc.
* All Rights Reserved.
*
* Portions created by Tony Garnock-Jones are Copyright (c) 2009-2010
* VMware, Inc. and Tony Garnock-Jones. All Rights Reserved.
*
* Permission is hereby granted, free of charge, to any person
* obtaining a copy of this software and associated documentation
* files (the "Software"), to deal in the Software without
* restriction, including without limitation the rights to use, copy,
* modify, merge, publish, distribute, sublicense, and/or sell copies
* of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be
* included in all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
* EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
* MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS
* BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN
* ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN
* CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
* ***** END LICENSE BLOCK *****
*/
#ifndef AMQP_FRAMING_H
#define AMQP_FRAMING_H
#include <amqp.h>
AMQP_BEGIN_DECLS
#define AMQP_PROTOCOL_VERSION_MAJOR 0
#define AMQP_PROTOCOL_VERSION_MINOR 9
#define AMQP_PROTOCOL_VERSION_REVISION 1
#define AMQP_PROTOCOL_PORT 5672
#define AMQP_FRAME_METHOD 1
#define AMQP_FRAME_HEADER 2
#define AMQP_FRAME_BODY 3
#define AMQP_FRAME_HEARTBEAT 8
#define AMQP_FRAME_MIN_SIZE 4096
#define AMQP_FRAME_END 206
#define AMQP_REPLY_SUCCESS 200
#define AMQP_CONTENT_TOO_LARGE 311
#define AMQP_NO_ROUTE 312
#define AMQP_NO_CONSUMERS 313
#define AMQP_ACCESS_REFUSED 403
#define AMQP_NOT_FOUND 404
#define AMQP_RESOURCE_LOCKED 405
#define AMQP_PRECONDITION_FAILED 406
#define AMQP_CONNECTION_FORCED 320
#define AMQP_INVALID_PATH 402
#define AMQP_FRAME_ERROR 501
#define AMQP_SYNTAX_ERROR 502
#define AMQP_COMMAND_INVALID 503
#define AMQP_CHANNEL_ERROR 504
#define AMQP_UNEXPECTED_FRAME 505
#define AMQP_RESOURCE_ERROR 506
#define AMQP_NOT_ALLOWED 530
#define AMQP_NOT_IMPLEMENTED 540
#define AMQP_INTERNAL_ERROR 541
/* Function prototypes. */
AMQP_PUBLIC_FUNCTION
char const *
AMQP_CALL amqp_constant_name(int constantNumber);
AMQP_PUBLIC_FUNCTION
amqp_boolean_t
AMQP_CALL amqp_constant_is_hard_error(int constantNumber);
AMQP_PUBLIC_FUNCTION
char const *
AMQP_CALL amqp_method_name(amqp_method_number_t methodNumber);
AMQP_PUBLIC_FUNCTION
amqp_boolean_t
AMQP_CALL amqp_method_has_content(amqp_method_number_t methodNumber);
AMQP_PUBLIC_FUNCTION
int
AMQP_CALL amqp_decode_method(amqp_method_number_t methodNumber,
amqp_pool_t *pool,
amqp_bytes_t encoded,
void **decoded);
AMQP_PUBLIC_FUNCTION
int
AMQP_CALL amqp_decode_properties(uint16_t class_id,
amqp_pool_t *pool,
amqp_bytes_t encoded,
void **decoded);
AMQP_PUBLIC_FUNCTION
int
AMQP_CALL amqp_encode_method(amqp_method_number_t methodNumber,
void *decoded,
amqp_bytes_t encoded);
AMQP_PUBLIC_FUNCTION
int
AMQP_CALL amqp_encode_properties(uint16_t class_id,
void *decoded,
amqp_bytes_t encoded);
/* Method field records. */
#define AMQP_CONNECTION_START_METHOD ((amqp_method_number_t) 0x000A000A) /* 10, 10; 655370 */
typedef struct amqp_connection_start_t_ {
uint8_t version_major;
uint8_t version_minor;
amqp_table_t server_properties;
amqp_bytes_t mechanisms;
amqp_bytes_t locales;
} amqp_connection_start_t;
#define AMQP_CONNECTION_START_OK_METHOD ((amqp_method_number_t) 0x000A000B) /* 10, 11; 655371 */
typedef struct amqp_connection_start_ok_t_ {
amqp_table_t client_properties;
amqp_bytes_t mechanism;
amqp_bytes_t response;
amqp_bytes_t locale;
} amqp_connection_start_ok_t;
#define AMQP_CONNECTION_SECURE_METHOD ((amqp_method_number_t) 0x000A0014) /* 10, 20; 655380 */
typedef struct amqp_connection_secure_t_ {
amqp_bytes_t challenge;
} amqp_connection_secure_t;
#define AMQP_CONNECTION_SECURE_OK_METHOD ((amqp_method_number_t) 0x000A0015) /* 10, 21; 655381 */
typedef struct amqp_connection_secure_ok_t_ {
amqp_bytes_t response;
} amqp_connection_secure_ok_t;
#define AMQP_CONNECTION_TUNE_METHOD ((amqp_method_number_t) 0x000A001E) /* 10, 30; 655390 */
typedef struct amqp_connection_tune_t_ {
uint16_t channel_max;
uint32_t frame_max;
uint16_t heartbeat;
} amqp_connection_tune_t;
#define AMQP_CONNECTION_TUNE_OK_METHOD ((amqp_method_number_t) 0x000A001F) /* 10, 31; 655391 */
typedef struct amqp_connection_tune_ok_t_ {
uint16_t channel_max;
uint32_t frame_max;
uint16_t heartbeat;
} amqp_connection_tune_ok_t;
#define AMQP_CONNECTION_OPEN_METHOD ((amqp_method_number_t) 0x000A0028) /* 10, 40; 655400 */
typedef struct amqp_connection_open_t_ {
amqp_bytes_t virtual_host;
amqp_bytes_t capabilities;
amqp_boolean_t insist;
} amqp_connection_open_t;
#define AMQP_CONNECTION_OPEN_OK_METHOD ((amqp_method_number_t) 0x000A0029) /* 10, 41; 655401 */
typedef struct amqp_connection_open_ok_t_ {
amqp_bytes_t known_hosts;
} amqp_connection_open_ok_t;
#define AMQP_CONNECTION_CLOSE_METHOD ((amqp_method_number_t) 0x000A0032) /* 10, 50; 655410 */
typedef struct amqp_connection_close_t_ {
uint16_t reply_code;
amqp_bytes_t reply_text;
uint16_t class_id;
uint16_t method_id;
} amqp_connection_close_t;
#define AMQP_CONNECTION_CLOSE_OK_METHOD ((amqp_method_number_t) 0x000A0033) /* 10, 51; 655411 */
typedef struct amqp_connection_close_ok_t_ {
char dummy; /* Dummy field to avoid empty struct */
} amqp_connection_close_ok_t;
#define AMQP_CHANNEL_OPEN_METHOD ((amqp_method_number_t) 0x0014000A) /* 20, 10; 1310730 */
typedef struct amqp_channel_open_t_ {
amqp_bytes_t out_of_band;
} amqp_channel_open_t;
#define AMQP_CHANNEL_OPEN_OK_METHOD ((amqp_method_number_t) 0x0014000B) /* 20, 11; 1310731 */
typedef struct amqp_channel_open_ok_t_ {
amqp_bytes_t channel_id;
} amqp_channel_open_ok_t;
#define AMQP_CHANNEL_FLOW_METHOD ((amqp_method_number_t) 0x00140014) /* 20, 20; 1310740 */
typedef struct amqp_channel_flow_t_ {
amqp_boolean_t active;
} amqp_channel_flow_t;
#define AMQP_CHANNEL_FLOW_OK_METHOD ((amqp_method_number_t) 0x00140015) /* 20, 21; 1310741 */
typedef struct amqp_channel_flow_ok_t_ {
amqp_boolean_t active;
} amqp_channel_flow_ok_t;
#define AMQP_CHANNEL_CLOSE_METHOD ((amqp_method_number_t) 0x00140028) /* 20, 40; 1310760 */
typedef struct amqp_channel_close_t_ {
uint16_t reply_code;
amqp_bytes_t reply_text;
uint16_t class_id;
uint16_t method_id;
} amqp_channel_close_t;
#define AMQP_CHANNEL_CLOSE_OK_METHOD ((amqp_method_number_t) 0x00140029) /* 20, 41; 1310761 */
typedef struct amqp_channel_close_ok_t_ {
char dummy; /* Dummy field to avoid empty struct */
} amqp_channel_close_ok_t;
#define AMQP_ACCESS_REQUEST_METHOD ((amqp_method_number_t) 0x001E000A) /* 30, 10; 1966090 */
typedef struct amqp_access_request_t_ {
amqp_bytes_t realm;
amqp_boolean_t exclusive;
amqp_boolean_t passive;
amqp_boolean_t active;
amqp_boolean_t write;
amqp_boolean_t read;
} amqp_access_request_t;
#define AMQP_ACCESS_REQUEST_OK_METHOD ((amqp_method_number_t) 0x001E000B) /* 30, 11; 1966091 */
typedef struct amqp_access_request_ok_t_ {
uint16_t ticket;
} amqp_access_request_ok_t;
#define AMQP_EXCHANGE_DECLARE_METHOD ((amqp_method_number_t) 0x0028000A) /* 40, 10; 2621450 */
typedef struct amqp_exchange_declare_t_ {
uint16_t ticket;
amqp_bytes_t exchange;
amqp_bytes_t type;
amqp_boolean_t passive;
amqp_boolean_t durable;
amqp_boolean_t auto_delete;
amqp_boolean_t internal;
amqp_boolean_t nowait;
amqp_table_t arguments;
} amqp_exchange_declare_t;
#define AMQP_EXCHANGE_DECLARE_OK_METHOD ((amqp_method_number_t) 0x0028000B) /* 40, 11; 2621451 */
typedef struct amqp_exchange_declare_ok_t_ {
char dummy; /* Dummy field to avoid empty struct */
} amqp_exchange_declare_ok_t;
#define AMQP_EXCHANGE_DELETE_METHOD ((amqp_method_number_t) 0x00280014) /* 40, 20; 2621460 */
typedef struct amqp_exchange_delete_t_ {
uint16_t ticket;
amqp_bytes_t exchange;
amqp_boolean_t if_unused;
amqp_boolean_t nowait;
} amqp_exchange_delete_t;
#define AMQP_EXCHANGE_DELETE_OK_METHOD ((amqp_method_number_t) 0x00280015) /* 40, 21; 2621461 */
typedef struct amqp_exchange_delete_ok_t_ {
char dummy; /* Dummy field to avoid empty struct */
} amqp_exchange_delete_ok_t;
#define AMQP_EXCHANGE_BIND_METHOD ((amqp_method_number_t) 0x0028001E) /* 40, 30; 2621470 */
typedef struct amqp_exchange_bind_t_ {
uint16_t ticket;
amqp_bytes_t destination;
amqp_bytes_t source;
amqp_bytes_t routing_key;
amqp_boolean_t nowait;
amqp_table_t arguments;
} amqp_exchange_bind_t;
#define AMQP_EXCHANGE_BIND_OK_METHOD ((amqp_method_number_t) 0x0028001F) /* 40, 31; 2621471 */
typedef struct amqp_exchange_bind_ok_t_ {
char dummy; /* Dummy field to avoid empty struct */
} amqp_exchange_bind_ok_t;
#define AMQP_EXCHANGE_UNBIND_METHOD ((amqp_method_number_t) 0x00280028) /* 40, 40; 2621480 */
typedef struct amqp_exchange_unbind_t_ {
uint16_t ticket;
amqp_bytes_t destination;
amqp_bytes_t source;
amqp_bytes_t routing_key;
amqp_boolean_t nowait;
amqp_table_t arguments;
} amqp_exchange_unbind_t;
#define AMQP_EXCHANGE_UNBIND_OK_METHOD ((amqp_method_number_t) 0x00280033) /* 40, 51; 2621491 */
typedef struct amqp_exchange_unbind_ok_t_ {
char dummy; /* Dummy field to avoid empty struct */
} amqp_exchange_unbind_ok_t;
#define AMQP_QUEUE_DECLARE_METHOD ((amqp_method_number_t) 0x0032000A) /* 50, 10; 3276810 */
typedef struct amqp_queue_declare_t_ {
uint16_t ticket;
amqp_bytes_t queue;
amqp_boolean_t passive;
amqp_boolean_t durable;
amqp_boolean_t exclusive;
amqp_boolean_t auto_delete;
amqp_boolean_t nowait;
amqp_table_t arguments;
} amqp_queue_declare_t;
#define AMQP_QUEUE_DECLARE_OK_METHOD ((amqp_method_number_t) 0x0032000B) /* 50, 11; 3276811 */
typedef struct amqp_queue_declare_ok_t_ {
amqp_bytes_t queue;
uint32_t message_count;
uint32_t consumer_count;
} amqp_queue_declare_ok_t;
#define AMQP_QUEUE_BIND_METHOD ((amqp_method_number_t) 0x00320014) /* 50, 20; 3276820 */
typedef struct amqp_queue_bind_t_ {
uint16_t ticket;
amqp_bytes_t queue;
amqp_bytes_t exchange;
amqp_bytes_t routing_key;
amqp_boolean_t nowait;
amqp_table_t arguments;
} amqp_queue_bind_t;
#define AMQP_QUEUE_BIND_OK_METHOD ((amqp_method_number_t) 0x00320015) /* 50, 21; 3276821 */
typedef struct amqp_queue_bind_ok_t_ {
char dummy; /* Dummy field to avoid empty struct */
} amqp_queue_bind_ok_t;
#define AMQP_QUEUE_PURGE_METHOD ((amqp_method_number_t) 0x0032001E) /* 50, 30; 3276830 */
typedef struct amqp_queue_purge_t_ {
uint16_t ticket;
amqp_bytes_t queue;
amqp_boolean_t nowait;
} amqp_queue_purge_t;
#define AMQP_QUEUE_PURGE_OK_METHOD ((amqp_method_number_t) 0x0032001F) /* 50, 31; 3276831 */
typedef struct amqp_queue_purge_ok_t_ {
uint32_t message_count;
} amqp_queue_purge_ok_t;
#define AMQP_QUEUE_DELETE_METHOD ((amqp_method_number_t) 0x00320028) /* 50, 40; 3276840 */
typedef struct amqp_queue_delete_t_ {
uint16_t ticket;
amqp_bytes_t queue;
amqp_boolean_t if_unused;
amqp_boolean_t if_empty;
amqp_boolean_t nowait;
} amqp_queue_delete_t;
#define AMQP_QUEUE_DELETE_OK_METHOD ((amqp_method_number_t) 0x00320029) /* 50, 41; 3276841 */
typedef struct amqp_queue_delete_ok_t_ {
uint32_t message_count;
} amqp_queue_delete_ok_t;
#define AMQP_QUEUE_UNBIND_METHOD ((amqp_method_number_t) 0x00320032) /* 50, 50; 3276850 */
typedef struct amqp_queue_unbind_t_ {
uint16_t ticket;
amqp_bytes_t queue;
amqp_bytes_t exchange;
amqp_bytes_t routing_key;
amqp_table_t arguments;
} amqp_queue_unbind_t;
#define AMQP_QUEUE_UNBIND_OK_METHOD ((amqp_method_number_t) 0x00320033) /* 50, 51; 3276851 */
typedef struct amqp_queue_unbind_ok_t_ {
char dummy; /* Dummy field to avoid empty struct */
} amqp_queue_unbind_ok_t;
#define AMQP_BASIC_QOS_METHOD ((amqp_method_number_t) 0x003C000A) /* 60, 10; 3932170 */
typedef struct amqp_basic_qos_t_ {
uint32_t prefetch_size;
uint16_t prefetch_count;
amqp_boolean_t global;
} amqp_basic_qos_t;
#define AMQP_BASIC_QOS_OK_METHOD ((amqp_method_number_t) 0x003C000B) /* 60, 11; 3932171 */
typedef struct amqp_basic_qos_ok_t_ {
char dummy; /* Dummy field to avoid empty struct */
} amqp_basic_qos_ok_t;
#define AMQP_BASIC_CONSUME_METHOD ((amqp_method_number_t) 0x003C0014) /* 60, 20; 3932180 */
typedef struct amqp_basic_consume_t_ {
uint16_t ticket;
amqp_bytes_t queue;
amqp_bytes_t consumer_tag;
amqp_boolean_t no_local;
amqp_boolean_t no_ack;
amqp_boolean_t exclusive;
amqp_boolean_t nowait;
amqp_table_t arguments;
} amqp_basic_consume_t;
#define AMQP_BASIC_CONSUME_OK_METHOD ((amqp_method_number_t) 0x003C0015) /* 60, 21; 3932181 */
typedef struct amqp_basic_consume_ok_t_ {
amqp_bytes_t consumer_tag;
} amqp_basic_consume_ok_t;
#define AMQP_BASIC_CANCEL_METHOD ((amqp_method_number_t) 0x003C001E) /* 60, 30; 3932190 */
typedef struct amqp_basic_cancel_t_ {
amqp_bytes_t consumer_tag;
amqp_boolean_t nowait;
} amqp_basic_cancel_t;
#define AMQP_BASIC_CANCEL_OK_METHOD ((amqp_method_number_t) 0x003C001F) /* 60, 31; 3932191 */
typedef struct amqp_basic_cancel_ok_t_ {
amqp_bytes_t consumer_tag;
} amqp_basic_cancel_ok_t;
#define AMQP_BASIC_PUBLISH_METHOD ((amqp_method_number_t) 0x003C0028) /* 60, 40; 3932200 */
typedef struct amqp_basic_publish_t_ {
uint16_t ticket;
amqp_bytes_t exchange;
amqp_bytes_t routing_key;
amqp_boolean_t mandatory;
amqp_boolean_t immediate;
} amqp_basic_publish_t;
#define AMQP_BASIC_RETURN_METHOD ((amqp_method_number_t) 0x003C0032) /* 60, 50; 3932210 */
typedef struct amqp_basic_return_t_ {
uint16_t reply_code;
amqp_bytes_t reply_text;
amqp_bytes_t exchange;
amqp_bytes_t routing_key;
} amqp_basic_return_t;
#define AMQP_BASIC_DELIVER_METHOD ((amqp_method_number_t) 0x003C003C) /* 60, 60; 3932220 */
typedef struct amqp_basic_deliver_t_ {
amqp_bytes_t consumer_tag;
uint64_t delivery_tag;
amqp_boolean_t redelivered;
amqp_bytes_t exchange;
amqp_bytes_t routing_key;
} amqp_basic_deliver_t;
#define AMQP_BASIC_GET_METHOD ((amqp_method_number_t) 0x003C0046) /* 60, 70; 3932230 */
typedef struct amqp_basic_get_t_ {
uint16_t ticket;
amqp_bytes_t queue;
amqp_boolean_t no_ack;
} amqp_basic_get_t;
#define AMQP_BASIC_GET_OK_METHOD ((amqp_method_number_t) 0x003C0047) /* 60, 71; 3932231 */
typedef struct amqp_basic_get_ok_t_ {
uint64_t delivery_tag;
amqp_boolean_t redelivered;
amqp_bytes_t exchange;
amqp_bytes_t routing_key;
uint32_t message_count;
} amqp_basic_get_ok_t;
#define AMQP_BASIC_GET_EMPTY_METHOD ((amqp_method_number_t) 0x003C0048) /* 60, 72; 3932232 */
typedef struct amqp_basic_get_empty_t_ {
amqp_bytes_t cluster_id;
} amqp_basic_get_empty_t;
#define AMQP_BASIC_ACK_METHOD ((amqp_method_number_t) 0x003C0050) /* 60, 80; 3932240 */
typedef struct amqp_basic_ack_t_ {
uint64_t delivery_tag;
amqp_boolean_t multiple;
} amqp_basic_ack_t;
#define AMQP_BASIC_REJECT_METHOD ((amqp_method_number_t) 0x003C005A) /* 60, 90; 3932250 */
typedef struct amqp_basic_reject_t_ {
uint64_t delivery_tag;
amqp_boolean_t requeue;
} amqp_basic_reject_t;
#define AMQP_BASIC_RECOVER_ASYNC_METHOD ((amqp_method_number_t) 0x003C0064) /* 60, 100; 3932260 */
typedef struct amqp_basic_recover_async_t_ {
amqp_boolean_t requeue;
} amqp_basic_recover_async_t;
#define AMQP_BASIC_RECOVER_METHOD ((amqp_method_number_t) 0x003C006E) /* 60, 110; 3932270 */
typedef struct amqp_basic_recover_t_ {
amqp_boolean_t requeue;
} amqp_basic_recover_t;
#define AMQP_BASIC_RECOVER_OK_METHOD ((amqp_method_number_t) 0x003C006F) /* 60, 111; 3932271 */
typedef struct amqp_basic_recover_ok_t_ {
char dummy; /* Dummy field to avoid empty struct */
} amqp_basic_recover_ok_t;
#define AMQP_BASIC_NACK_METHOD ((amqp_method_number_t) 0x003C0078) /* 60, 120; 3932280 */
typedef struct amqp_basic_nack_t_ {
uint64_t delivery_tag;
amqp_boolean_t multiple;
amqp_boolean_t requeue;
} amqp_basic_nack_t;
#define AMQP_TX_SELECT_METHOD ((amqp_method_number_t) 0x005A000A) /* 90, 10; 5898250 */
typedef struct amqp_tx_select_t_ {
char dummy; /* Dummy field to avoid empty struct */
} amqp_tx_select_t;
#define AMQP_TX_SELECT_OK_METHOD ((amqp_method_number_t) 0x005A000B) /* 90, 11; 5898251 */
typedef struct amqp_tx_select_ok_t_ {
char dummy; /* Dummy field to avoid empty struct */
} amqp_tx_select_ok_t;
#define AMQP_TX_COMMIT_METHOD ((amqp_method_number_t) 0x005A0014) /* 90, 20; 5898260 */
typedef struct amqp_tx_commit_t_ {
char dummy; /* Dummy field to avoid empty struct */
} amqp_tx_commit_t;
#define AMQP_TX_COMMIT_OK_METHOD ((amqp_method_number_t) 0x005A0015) /* 90, 21; 5898261 */
typedef struct amqp_tx_commit_ok_t_ {
char dummy; /* Dummy field to avoid empty struct */
} amqp_tx_commit_ok_t;
#define AMQP_TX_ROLLBACK_METHOD ((amqp_method_number_t) 0x005A001E) /* 90, 30; 5898270 */
typedef struct amqp_tx_rollback_t_ {
char dummy; /* Dummy field to avoid empty struct */
} amqp_tx_rollback_t;
#define AMQP_TX_ROLLBACK_OK_METHOD ((amqp_method_number_t) 0x005A001F) /* 90, 31; 5898271 */
typedef struct amqp_tx_rollback_ok_t_ {
char dummy; /* Dummy field to avoid empty struct */
} amqp_tx_rollback_ok_t;
#define AMQP_CONFIRM_SELECT_METHOD ((amqp_method_number_t) 0x0055000A) /* 85, 10; 5570570 */
typedef struct amqp_confirm_select_t_ {
amqp_boolean_t nowait;
} amqp_confirm_select_t;
#define AMQP_CONFIRM_SELECT_OK_METHOD ((amqp_method_number_t) 0x0055000B) /* 85, 11; 5570571 */
typedef struct amqp_confirm_select_ok_t_ {
char dummy; /* Dummy field to avoid empty struct */
} amqp_confirm_select_ok_t;
/* Class property records. */
#define AMQP_CONNECTION_CLASS (0x000A) /* 10 */
typedef struct amqp_connection_properties_t_ {
amqp_flags_t _flags;
char dummy; /* Dummy field to avoid empty struct */
} amqp_connection_properties_t;
#define AMQP_CHANNEL_CLASS (0x0014) /* 20 */
typedef struct amqp_channel_properties_t_ {
amqp_flags_t _flags;
char dummy; /* Dummy field to avoid empty struct */
} amqp_channel_properties_t;
#define AMQP_ACCESS_CLASS (0x001E) /* 30 */
typedef struct amqp_access_properties_t_ {
amqp_flags_t _flags;
char dummy; /* Dummy field to avoid empty struct */
} amqp_access_properties_t;
#define AMQP_EXCHANGE_CLASS (0x0028) /* 40 */
typedef struct amqp_exchange_properties_t_ {
amqp_flags_t _flags;
char dummy; /* Dummy field to avoid empty struct */
} amqp_exchange_properties_t;
#define AMQP_QUEUE_CLASS (0x0032) /* 50 */
typedef struct amqp_queue_properties_t_ {
amqp_flags_t _flags;
char dummy; /* Dummy field to avoid empty struct */
} amqp_queue_properties_t;
#define AMQP_BASIC_CLASS (0x003C) /* 60 */
#define AMQP_BASIC_CONTENT_TYPE_FLAG (1 << 15)
#define AMQP_BASIC_CONTENT_ENCODING_FLAG (1 << 14)
#define AMQP_BASIC_HEADERS_FLAG (1 << 13)
#define AMQP_BASIC_DELIVERY_MODE_FLAG (1 << 12)
#define AMQP_BASIC_PRIORITY_FLAG (1 << 11)
#define AMQP_BASIC_CORRELATION_ID_FLAG (1 << 10)
#define AMQP_BASIC_REPLY_TO_FLAG (1 << 9)
#define AMQP_BASIC_EXPIRATION_FLAG (1 << 8)
#define AMQP_BASIC_MESSAGE_ID_FLAG (1 << 7)
#define AMQP_BASIC_TIMESTAMP_FLAG (1 << 6)
#define AMQP_BASIC_TYPE_FLAG (1 << 5)
#define AMQP_BASIC_USER_ID_FLAG (1 << 4)
#define AMQP_BASIC_APP_ID_FLAG (1 << 3)
#define AMQP_BASIC_CLUSTER_ID_FLAG (1 << 2)
typedef struct amqp_basic_properties_t_ {
amqp_flags_t _flags;
amqp_bytes_t content_type;
amqp_bytes_t content_encoding;
amqp_table_t headers;
uint8_t delivery_mode;
uint8_t priority;
amqp_bytes_t correlation_id;
amqp_bytes_t reply_to;
amqp_bytes_t expiration;
amqp_bytes_t message_id;
uint64_t timestamp;
amqp_bytes_t type;
amqp_bytes_t user_id;
amqp_bytes_t app_id;
amqp_bytes_t cluster_id;
} amqp_basic_properties_t;
#define AMQP_TX_CLASS (0x005A) /* 90 */
typedef struct amqp_tx_properties_t_ {
amqp_flags_t _flags;
char dummy; /* Dummy field to avoid empty struct */
} amqp_tx_properties_t;
#define AMQP_CONFIRM_CLASS (0x0055) /* 85 */
typedef struct amqp_confirm_properties_t_ {
amqp_flags_t _flags;
char dummy; /* Dummy field to avoid empty struct */
} amqp_confirm_properties_t;
/* API functions for methods */
AMQP_PUBLIC_FUNCTION amqp_channel_open_ok_t * AMQP_CALL amqp_channel_open(amqp_connection_state_t state, amqp_channel_t channel);
AMQP_PUBLIC_FUNCTION amqp_channel_flow_ok_t * AMQP_CALL amqp_channel_flow(amqp_connection_state_t state, amqp_channel_t channel, amqp_boolean_t active);
AMQP_PUBLIC_FUNCTION amqp_exchange_declare_ok_t * AMQP_CALL amqp_exchange_declare(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t exchange, amqp_bytes_t type, amqp_boolean_t passive, amqp_boolean_t durable, amqp_table_t arguments);
AMQP_PUBLIC_FUNCTION amqp_exchange_delete_ok_t * AMQP_CALL amqp_exchange_delete(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t exchange, amqp_boolean_t if_unused);
AMQP_PUBLIC_FUNCTION amqp_exchange_bind_ok_t * AMQP_CALL amqp_exchange_bind(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t destination, amqp_bytes_t source, amqp_bytes_t routing_key, amqp_table_t arguments);
AMQP_PUBLIC_FUNCTION amqp_exchange_unbind_ok_t * AMQP_CALL amqp_exchange_unbind(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t destination, amqp_bytes_t source, amqp_bytes_t routing_key, amqp_table_t arguments);
AMQP_PUBLIC_FUNCTION amqp_queue_declare_ok_t * AMQP_CALL amqp_queue_declare(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t queue, amqp_boolean_t passive, amqp_boolean_t durable, amqp_boolean_t exclusive, amqp_boolean_t auto_delete, amqp_table_t arguments);
AMQP_PUBLIC_FUNCTION amqp_queue_bind_ok_t * AMQP_CALL amqp_queue_bind(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t queue, amqp_bytes_t exchange, amqp_bytes_t routing_key, amqp_table_t arguments);
AMQP_PUBLIC_FUNCTION amqp_queue_purge_ok_t * AMQP_CALL amqp_queue_purge(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t queue);
AMQP_PUBLIC_FUNCTION amqp_queue_delete_ok_t * AMQP_CALL amqp_queue_delete(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t queue, amqp_boolean_t if_unused, amqp_boolean_t if_empty);
AMQP_PUBLIC_FUNCTION amqp_queue_unbind_ok_t * AMQP_CALL amqp_queue_unbind(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t queue, amqp_bytes_t exchange, amqp_bytes_t routing_key, amqp_table_t arguments);
AMQP_PUBLIC_FUNCTION amqp_basic_qos_ok_t * AMQP_CALL amqp_basic_qos(amqp_connection_state_t state, amqp_channel_t channel, uint32_t prefetch_size, uint16_t prefetch_count, amqp_boolean_t global);
AMQP_PUBLIC_FUNCTION amqp_basic_consume_ok_t * AMQP_CALL amqp_basic_consume(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t queue, amqp_bytes_t consumer_tag, amqp_boolean_t no_local, amqp_boolean_t no_ack, amqp_boolean_t exclusive, amqp_table_t arguments);
AMQP_PUBLIC_FUNCTION amqp_basic_cancel_ok_t * AMQP_CALL amqp_basic_cancel(amqp_connection_state_t state, amqp_channel_t channel, amqp_bytes_t consumer_tag);
AMQP_PUBLIC_FUNCTION amqp_basic_recover_ok_t * AMQP_CALL amqp_basic_recover(amqp_connection_state_t state, amqp_channel_t channel, amqp_boolean_t requeue);
AMQP_PUBLIC_FUNCTION amqp_tx_select_ok_t * AMQP_CALL amqp_tx_select(amqp_connection_state_t state, amqp_channel_t channel);
AMQP_PUBLIC_FUNCTION amqp_tx_commit_ok_t * AMQP_CALL amqp_tx_commit(amqp_connection_state_t state, amqp_channel_t channel);
AMQP_PUBLIC_FUNCTION amqp_tx_rollback_ok_t * AMQP_CALL amqp_tx_rollback(amqp_connection_state_t state, amqp_channel_t channel);
AMQP_PUBLIC_FUNCTION amqp_confirm_select_ok_t * AMQP_CALL amqp_confirm_select(amqp_connection_state_t state, amqp_channel_t channel);
AMQP_END_DECLS
#endif /* AMQP_FRAMING_H */

View File

@ -0,0 +1,161 @@
/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */
/** \file */
/*
* Copyright 2012-2013 Michael Steinert
*
* Permission is hereby granted, free of charge, to any person obtaining a
* copy of this software and associated documentation files (the "Software"),
* to deal in the Software without restriction, including without limitation
* the rights to use, copy, modify, merge, publish, distribute, sublicense,
* and/or sell copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
* DEALINGS IN THE SOFTWARE.
*/
#ifndef AMQP_SSL_H
#define AMQP_SSL_H
#include <amqp.h>
AMQP_BEGIN_DECLS
/**
* Create a new SSL/TLS socket object.
*
* The returned socket object is owned by the \ref amqp_connection_state_t object
* and will be destroyed when the state object is destroyed or a new socket
* object is created.
*
* If the socket object creation fails, the \ref amqp_connection_state_t object
* will not be changed.
*
* The object returned by this function can be retrieved from the
* amqp_connection_state_t object later using the amqp_get_socket() function.
*
* Calling this function may result in the underlying SSL library being initialized.
* \sa amqp_set_initialize_ssl_library()
*
* \param [in,out] state The connection object that owns the SSL/TLS socket
* \return A new socket object or NULL if an error occurred.
*
* \since v0.4.0
*/
AMQP_PUBLIC_FUNCTION
amqp_socket_t *
AMQP_CALL
amqp_ssl_socket_new(amqp_connection_state_t state);
/**
* Set the CA certificate.
*
* \param [in,out] self An SSL/TLS socket object.
* \param [in] cacert Path to the CA cert file in PEM format.
*
* \return \ref AMQP_STATUS_OK on success an enum
*
* \since v0.4.0
*/
AMQP_PUBLIC_FUNCTION
int
AMQP_CALL
amqp_ssl_socket_set_cacert(amqp_socket_t *self,
const char *cacert);
/**
* Set the client key.
*
* \param [in,out] self An SSL/TLS socket object.
* \param [in] cert Path to the client certificate in PEM foramt.
* \param [in] key Path to the client key in PEM format.
*
* \return Zero if successful, -1 otherwise.
*
* \since v0.4.0
*/
AMQP_PUBLIC_FUNCTION
int
AMQP_CALL
amqp_ssl_socket_set_key(amqp_socket_t *self,
const char *cert,
const char *key);
/**
* Set the client key from a buffer.
*
* \param [in,out] self An SSL/TLS socket object.
* \param [in] cert Path to the client certificate in PEM foramt.
* \param [in] key A buffer containing client key in PEM format.
* \param [in] n The length of the buffer.
*
* \return Zero if successful, -1 otherwise.
*
* \since v0.4.0
*/
AMQP_PUBLIC_FUNCTION
int
AMQP_CALL
amqp_ssl_socket_set_key_buffer(amqp_socket_t *self,
const char *cert,
const void *key,
size_t n);
/**
* Enable or disable peer verification.
*
* If peer verification is enabled then the common name in the server
* certificate must match the server name. Peer verification is enabled by
* default.
*
* \param [in,out] self An SSL/TLS socket object.
* \param [in] verify Enable or disable peer verification.
*
* \since v0.4.0
*/
AMQP_PUBLIC_FUNCTION
void
AMQP_CALL
amqp_ssl_socket_set_verify(amqp_socket_t *self,
amqp_boolean_t verify);
/**
* Sets whether rabbitmq-c initializes the underlying SSL library.
*
* For SSL libraries that require a one-time initialization across
* a whole program (e.g., OpenSSL) this sets whether or not rabbitmq-c
* will initialize the SSL library when the first call to
* amqp_open_socket() is made. You should call this function with
* do_init = 0 if the underlying SSL library is initialized somewhere else
* the program.
*
* Failing to initialize or double initialization of the SSL library will
* result in undefined behavior
*
* By default rabbitmq-c will initialize the underlying SSL library
*
* NOTE: calling this function after the first socket has been opened with
* amqp_open_socket() will not have any effect.
*
* \param [in] do_initalize If 0 rabbitmq-c will not initialize the SSL
* library, otherwise rabbitmq-c will initialize the
* SSL library
*
* \since v0.4.0
*/
AMQP_PUBLIC_FUNCTION
void
AMQP_CALL
amqp_set_initialize_ssl_library(amqp_boolean_t do_initialize);
AMQP_END_DECLS
#endif /* AMQP_SSL_H */

View File

@ -0,0 +1,69 @@
/* vim:set ft=c ts=2 sw=2 sts=2 et cindent: */
/** \file */
/*
* Copyright 2012-2013 Michael Steinert
*
* Permission is hereby granted, free of charge, to any person obtaining a
* copy of this software and associated documentation files (the "Software"),
* to deal in the Software without restriction, including without limitation
* the rights to use, copy, modify, merge, publish, distribute, sublicense,
* and/or sell copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
* DEALINGS IN THE SOFTWARE.
*/
/**
* A TCP socket connection.
*/
#ifndef AMQP_TCP_SOCKET_H
#define AMQP_TCP_SOCKET_H
#include <amqp.h>
AMQP_BEGIN_DECLS
/**
* Create a new TCP socket.
*
* Call amqp_socket_close() to release socket resources.
*
* \return A new socket object or NULL if an error occurred.
*
* \since v0.4.0
*/
AMQP_PUBLIC_FUNCTION
amqp_socket_t *
AMQP_CALL
amqp_tcp_socket_new(amqp_connection_state_t state);
/**
* Assign an open file descriptor to a socket object.
*
* This function must not be used in conjunction with amqp_socket_open(), i.e.
* the socket connection should already be open(2) when this function is
* called.
*
* \param [in,out] self A TCP socket object.
* \param [in] sockfd An open socket descriptor.
*
* \since v0.4.0
*/
AMQP_PUBLIC_FUNCTION
void
AMQP_CALL
amqp_tcp_socket_set_sockfd(amqp_socket_t *base, int sockfd);
AMQP_END_DECLS
#endif /* AMQP_TCP_SOCKET_H */

1641
c-demo-agent/lcfg_static.c Normal file

File diff suppressed because it is too large Load Diff

146
c-demo-agent/lcfg_static.h Normal file
View File

@ -0,0 +1,146 @@
/* This file is an autogenerated single-file version of liblcfg.
* It is recommended that you update this file on a regular
* basis from the original liblcfg distribution package.
*
* The most recent version of liblcfg is available at
* <http://liblcfg.carnivore.it>
*/
/*
Copyright (c) 2012, Paul Baecher
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
* Neither the name of the <organization> nor the
names of its contributors may be used to endorse or promote products
derived from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL <THE COPYRIGHT HOLDER> BE LIABLE FOR ANY
DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef LCFG_H
#define LCFG_H
#include <stdlib.h>
struct lcfg;
enum lcfg_status { lcfg_status_ok, lcfg_status_error };
typedef enum lcfg_status (*lcfg_visitor_function)(const char *key, void *data, size_t size, void *user_data);
/* open a new config file */
struct lcfg * lcfg_new(const char *filename);
/* parse config into memory */
enum lcfg_status lcfg_parse(struct lcfg *);
/* visit all configuration elements */
enum lcfg_status lcfg_accept(struct lcfg *, lcfg_visitor_function, void *);
/* access a value by path */
enum lcfg_status lcfg_value_get(struct lcfg *, const char *, void **, size_t *);
/* return the last error message */
const char * lcfg_error_get(struct lcfg *);
/* set error */
void lcfg_error_set(struct lcfg *, const char *fmt, ...);
/* destroy lcfg context */
void lcfg_delete(struct lcfg *);
#endif
/*
Copyright (c) 2012, Paul Baecher
All rights reserved.
Redistribution and use in source and binary forms, with or without
modification, are permitted provided that the following conditions are met:
* Redistributions of source code must retain the above copyright
notice, this list of conditions and the following disclaimer.
* Redistributions in binary form must reproduce the above copyright
notice, this list of conditions and the following disclaimer in the
documentation and/or other materials provided with the distribution.
* Neither the name of the <organization> nor the
names of its contributors may be used to endorse or promote products
derived from this software without specific prior written permission.
THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND
ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
DISCLAIMED. IN NO EVENT SHALL <THE COPYRIGHT HOLDER> BE LIABLE FOR ANY
DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES
(INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES;
LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
(INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS
SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*/
#ifndef LCFGX_TREE_H
#define LCFGX_TREE_H
enum lcfgx_type
{
lcfgx_string,
lcfgx_list,
lcfgx_map,
};
struct lcfgx_tree_node
{
enum lcfgx_type type;
char *key; /* NULL for root node */
union
{
struct
{
void *data;
size_t len;
} string;
struct lcfgx_tree_node *elements; /* in case of list or map type */
} value;
struct lcfgx_tree_node *next;
};
struct lcfgx_tree_node *lcfgx_tree_new(struct lcfg *);
void lcfgx_tree_delete(struct lcfgx_tree_node *);
void lcfgx_tree_dump(struct lcfgx_tree_node *node, int depth);
enum lcfgx_path_access
{
LCFGX_PATH_NOT_FOUND,
LCFGX_PATH_FOUND_WRONG_TYPE_BAD,
LCFGX_PATH_FOUND_TYPE_OK,
};
extern const char *lcfgx_path_access_strings[];
enum lcfgx_path_access lcfgx_get(struct lcfgx_tree_node *root, struct lcfgx_tree_node **n, const char *key, enum lcfgx_type type);
enum lcfgx_path_access lcfgx_get_list(struct lcfgx_tree_node *root, struct lcfgx_tree_node **n, const char *key);
enum lcfgx_path_access lcfgx_get_map(struct lcfgx_tree_node *root, struct lcfgx_tree_node **n, const char *key);
enum lcfgx_path_access lcfgx_get_string(struct lcfgx_tree_node *root, struct lcfgx_tree_node **n, const char *key);
#endif

BIN
c-demo-agent/librabbitmq.a Normal file

Binary file not shown.

239
c-demo-agent/murano-agent.c Normal file
View File

@ -0,0 +1,239 @@
#include <stdlib.h>
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <stdint.h>
#include <assert.h>
#include <fcntl.h>
#include <sys/stat.h>
#include <sys/types.h>
#include <amqp_tcp_socket.h>
#include <amqp.h>
#include <amqp_framing.h>
#include "utils.h"
#include "lcfg_static.h"
static void run ( amqp_connection_state_t conn, int log_fd , const char *result_routing_key )
{
int received = 0;
amqp_frame_t frame;
while ( 1 )
{
amqp_rpc_reply_t ret;
amqp_envelope_t envelope;
amqp_maybe_release_buffers ( conn );
ret = amqp_consume_message ( conn, &envelope, NULL, 0 );
if ( AMQP_RESPONSE_NORMAL == ret.reply_type )
{
int i;
amqp_bytes_t body = envelope.message.body;
const char *title = "A new message received:\n";
fprintf ( stdout, title, received );
for ( i = 0; i < body.len; i++ )
{
fprintf ( stdout, "%c", * ( char* ) ( body.bytes + i ) );
}
puts ( "\n" );
write ( log_fd, ( void * ) title, strlen ( title ) );
write ( log_fd, body.bytes, body.len );
write ( log_fd, ( void * ) "\n\n", 2 );
/* Send a reply. */
amqp_basic_properties_t props;
props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG | AMQP_BASIC_MESSAGE_ID_FLAG;
printf("message id: %s", (const char*)envelope.message.properties.message_id.bytes);
props.message_id = amqp_bytes_malloc_dup ( envelope.message.properties.message_id );
props.content_type = amqp_cstring_bytes ( "text/json" );
props.delivery_mode = 2; /* persistent delivery mode */
const char *result_body = "{\"IsException\": false, \"Result\": [{\"IsException\": false, \"Result\": []}]}";
die_on_error ( amqp_basic_publish ( conn,
1,
amqp_cstring_bytes ( "" ),
amqp_cstring_bytes ( result_routing_key ),
0,
0,
&props,
amqp_cstring_bytes ( result_body ) ),
"Publishing" );
amqp_destroy_envelope ( &envelope );
}
else
{
if ( AMQP_RESPONSE_LIBRARY_EXCEPTION == ret.reply_type &&
AMQP_STATUS_UNEXPECTED_STATE == ret.library_error )
{
if ( AMQP_STATUS_OK != amqp_simple_wait_frame ( conn, &frame ) )
{
return;
}
if ( AMQP_FRAME_METHOD == frame.frame_type )
{
switch ( frame.payload.method.id )
{
case AMQP_BASIC_ACK_METHOD:
/* if we've turned publisher confirms on, and we've published a message
* here is a message being confirmed
*/
break;
case AMQP_BASIC_RETURN_METHOD:
/* if a published message couldn't be routed and the mandatory flag was set
* this is what would be returned. The message then needs to be read.
*/
{
amqp_message_t message;
ret = amqp_read_message ( conn, frame.channel, &message, 0 );
if ( AMQP_RESPONSE_NORMAL != ret.reply_type )
{
return;
}
amqp_destroy_message ( &message );
}
break;
case AMQP_CHANNEL_CLOSE_METHOD:
/* a channel.close method happens when a channel exception occurs, this
* can happen by publishing to an exchange that doesn't exist for example
*
* In this case you would need to open another channel redeclare any queues
* that were declared auto-delete, and restart any consumers that were attached
* to the previous channel
*/
return;
case AMQP_CONNECTION_CLOSE_METHOD:
/* a connection.close method happens when a connection exception occurs,
* this can happen by trying to use a channel that isn't open for example.
*
* In this case the whole connection must be restarted.
*/
return;
default:
fprintf ( stderr ,"An unexpected method was received %d\n", frame.payload.method.id );
return;
}
}
}
}
received++;
}
}
static const char* get_config_value ( struct lcfg *cfg, const char *key, int verbose )
{
void *data;
size_t len;
if ( lcfg_value_get ( cfg, key, &data, &len ) != lcfg_status_ok )
{
fprintf ( stderr, "Key %s is not found in the configuration file", key );
}
const char *val = ( const char * ) data;
if ( verbose )
{
fprintf ( stdout, "%s = %s\n", key, val );
}
return val;
}
int main ( int argc, char const *const *argv )
{
if ( argc != 3 )
{
printf ( "usage: %s CFG_FILE LOG_FILE\n", argv[0] );
return -1;
}
const char *log_filename = argv[2];
int flags = O_CREAT | O_APPEND | O_RDWR;
mode_t mode = S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP;
int log_fd = open ( log_filename, flags, mode );
if ( log_fd < 0 )
{
fprintf ( stderr, "ERROR: Falied to open the log file '%s'\n", log_filename );
exit ( 1 );
}
/* Read the configuration file. */
struct lcfg *cfg = lcfg_new ( argv[1] );
if ( lcfg_parse ( cfg ) != lcfg_status_ok )
{
printf ( "lcfg error: %s\n", lcfg_error_get ( cfg ) );
return -1;
}
/* Read all the configuration parameters. */
fprintf ( stdout, "Starting Murano agent with the following configuration:\n\n" );
const char *host = get_config_value ( cfg, "RABBITMQ_HOST" , 1 );
int port = atoi ( get_config_value ( cfg, "RABBITMQ_PORT" , 1 ) );
const char *vhost = get_config_value ( cfg, "RABBITMQ_VHOST" , 1 );
const char *username = get_config_value ( cfg, "RABBITMQ_USERNAME" , 1 );
const char *password = get_config_value ( cfg, "RABBITMQ_PASSWORD" , 1 );
const char *queuename = get_config_value ( cfg, "RABBITMQ_INPUT_QUEUE" , 1 );
const char *result_routing_key = get_config_value ( cfg, "RABBITMQ_RESULT_ROUTING_KEY", 1 );
amqp_connection_state_t conn = amqp_new_connection();
amqp_socket_t *socket = NULL;
amqp_bytes_t queuename_bytes = amqp_cstring_bytes ( queuename );
socket = amqp_tcp_socket_new ( conn );
if ( !socket )
{
die ( "creating TCP socket" );
}
if ( amqp_socket_open ( socket, host, port ) )
{
die ( "opening TCP socket" );
}
die_on_amqp_error ( amqp_login ( conn, vhost, 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, username, password ),
"Logging in" );
amqp_channel_open ( conn, 1 );
die_on_amqp_error ( amqp_get_rpc_reply ( conn ), "Opening channel" );
amqp_basic_consume ( conn, 1, queuename_bytes, amqp_empty_bytes, 0, 1, 0, amqp_empty_table );
die_on_amqp_error ( amqp_get_rpc_reply ( conn ), "Consuming" );
puts ( "\nSuccessfully connected to Rabbit MQ server! Ready for messages..." );
run ( conn, log_fd , result_routing_key );
close ( log_fd );
lcfg_delete ( cfg );
die_on_amqp_error ( amqp_channel_close ( conn, 1, AMQP_REPLY_SUCCESS ), "Closing channel" );
die_on_amqp_error ( amqp_connection_close ( conn, AMQP_REPLY_SUCCESS ), "Closing connection" );
die_on_error ( amqp_destroy_connection ( conn ), "Ending connection" );
return 0;
}

View File

@ -0,0 +1,7 @@
RABBITMQ_HOST = "localhost"
RABBITMQ_PORT = "5672"
RABBITMQ_USERNAME = "guest"
RABBITMQ_PASSWORD = "guest"
RABBITMQ_VHOST = "/"
RABBITMQ_INPUT_QUEUE = "test queue"
RABBITMQ_RESULT_ROUTING_KEY = "result queue"

128
c-demo-agent/producer.c Normal file
View File

@ -0,0 +1,128 @@
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <stdint.h>
#include <amqp_tcp_socket.h>
#include <amqp.h>
#include <amqp_framing.h>
#include "utils.h"
#define SUMMARY_EVERY_US 1000000
static void send_batch ( amqp_connection_state_t conn,
char const *queue_name,
int rate_limit,
int message_count )
{
uint64_t start_time = now_microseconds();
int i;
int sent = 0;
int previous_sent = 0;
uint64_t previous_report_time = start_time;
uint64_t next_summary_time = start_time + SUMMARY_EVERY_US;
const char *message = "Hello from the producer!";
amqp_bytes_t message_bytes;
message_bytes.len = strlen ( message );
message_bytes.bytes = ( void * ) message;
for ( i = 0; i < message_count; i++ )
{
uint64_t now = now_microseconds();
amqp_basic_properties_t props;
props._flags = AMQP_BASIC_CONTENT_TYPE_FLAG | AMQP_BASIC_DELIVERY_MODE_FLAG | AMQP_BASIC_MESSAGE_ID_FLAG;
props.message_id = amqp_cstring_bytes( " msg_id " );
props.content_type = amqp_cstring_bytes ( "text/json" );
props.delivery_mode = 2; /* persistent delivery mode */
die_on_error ( amqp_basic_publish ( conn,
1,
amqp_cstring_bytes ( "" ),
amqp_cstring_bytes ( queue_name ),
0,
0,
&props,
message_bytes ),
"Publishing" );
sent++;
if ( now > next_summary_time )
{
int countOverInterval = sent - previous_sent;
double intervalRate = countOverInterval / ( ( now - previous_report_time ) / 1000000.0 );
printf ( "%d ms: Sent %d - %d since last report (%d Hz)\n",
( int ) ( now - start_time ) / 1000, sent, countOverInterval, ( int ) intervalRate );
previous_sent = sent;
previous_report_time = now;
next_summary_time += SUMMARY_EVERY_US;
}
while ( ( ( i * 1000000.0 ) / ( now - start_time ) ) > rate_limit )
{
microsleep ( 2000 );
now = now_microseconds();
}
}
{
uint64_t stop_time = now_microseconds();
int total_delta = stop_time - start_time;
printf ( "PRODUCER - Message count: %d\n", message_count );
printf ( "Total time, milliseconds: %d\n", total_delta / 1000 );
printf ( "Overall messages-per-second: %g\n", ( message_count / ( total_delta / 1000000.0 ) ) );
}
}
int main ( int argc, char const *const *argv )
{
char const *hostname;
int port, status;
int rate_limit;
int message_count;
amqp_socket_t *socket = NULL;
amqp_connection_state_t conn;
if ( argc < 5 )
{
fprintf ( stderr, "Usage: producer host port rate_limit message_count\n" );
return 1;
}
hostname = argv[1];
port = atoi ( argv[2] );
rate_limit = atoi ( argv[3] );
message_count = atoi ( argv[4] );
conn = amqp_new_connection();
socket = amqp_tcp_socket_new ( conn );
if ( !socket )
{
die ( "creating TCP socket" );
}
status = amqp_socket_open ( socket, hostname, port );
if ( status )
{
die ( "opening TCP socket" );
}
die_on_amqp_error ( amqp_login ( conn, "/", 0, 131072, 0, AMQP_SASL_METHOD_PLAIN, "guest", "guest" ),
"Logging in" );
amqp_channel_open ( conn, 1 );
die_on_amqp_error ( amqp_get_rpc_reply ( conn ), "Opening channel" );
send_batch ( conn, "test queue", rate_limit, message_count );
die_on_amqp_error ( amqp_channel_close ( conn, 1, AMQP_REPLY_SUCCESS ), "Closing channel" );
die_on_amqp_error ( amqp_connection_close ( conn, AMQP_REPLY_SUCCESS ), "Closing connection" );
die_on_error ( amqp_destroy_connection ( conn ), "Ending connection" );
return 0;
}

201
c-demo-agent/utils.c Normal file
View File

@ -0,0 +1,201 @@
#include <stdarg.h>
#include <stdlib.h>
#include <stdio.h>
#include <string.h>
#include <ctype.h>
#include <stdint.h>
#include <amqp.h>
#include <amqp_framing.h>
#include "utils.h"
/* For usleep */
/* #define _BSD_SOURCE */
#include <stdint.h>
#include <sys/time.h>
#include <unistd.h>
uint64_t now_microseconds ( void )
{
struct timeval tv;
gettimeofday ( &tv, NULL );
return ( uint64_t ) tv.tv_sec * 1000000 + ( uint64_t ) tv.tv_usec;
}
void microsleep ( int usec )
{
usleep ( usec );
}
void die ( const char *fmt, ... )
{
va_list ap;
va_start ( ap, fmt );
vfprintf ( stderr, fmt, ap );
va_end ( ap );
fprintf ( stderr, "\n" );
exit ( 1 );
}
void die_on_error ( int x, char const *context )
{
if ( x < 0 )
{
fprintf ( stderr, "%s: %s\n", context, amqp_error_string2 ( x ) );
exit ( 1 );
}
}
void die_on_amqp_error ( amqp_rpc_reply_t x, char const *context )
{
switch ( x.reply_type )
{
case AMQP_RESPONSE_NORMAL:
return;
case AMQP_RESPONSE_NONE:
fprintf ( stderr, "%s: missing RPC reply type!\n", context );
break;
case AMQP_RESPONSE_LIBRARY_EXCEPTION:
fprintf ( stderr, "%s: %s\n", context, amqp_error_string2 ( x.library_error ) );
break;
case AMQP_RESPONSE_SERVER_EXCEPTION:
switch ( x.reply.id )
{
case AMQP_CONNECTION_CLOSE_METHOD:
{
amqp_connection_close_t *m = ( amqp_connection_close_t * ) x.reply.decoded;
fprintf ( stderr, "%s: server connection error %d, message: %.*s\n",
context,
m->reply_code,
( int ) m->reply_text.len, ( char * ) m->reply_text.bytes );
break;
}
case AMQP_CHANNEL_CLOSE_METHOD:
{
amqp_channel_close_t *m = ( amqp_channel_close_t * ) x.reply.decoded;
fprintf ( stderr, "%s: server channel error %d, message: %.*s\n",
context,
m->reply_code,
( int ) m->reply_text.len, ( char * ) m->reply_text.bytes );
break;
}
default:
fprintf ( stderr, "%s: unknown server error, method id 0x%08X\n", context, x.reply.id );
break;
}
break;
}
exit ( 1 );
}
static void dump_row ( long count, int numinrow, int *chs )
{
int i;
printf ( "%08lX:", count - numinrow );
if ( numinrow > 0 )
{
for ( i = 0; i < numinrow; i++ )
{
if ( i == 8 )
{
printf ( " :" );
}
printf ( " %02X", chs[i] );
}
for ( i = numinrow; i < 16; i++ )
{
if ( i == 8 )
{
printf ( " :" );
}
printf ( " " );
}
printf ( " " );
for ( i = 0; i < numinrow; i++ )
{
if ( isprint ( chs[i] ) )
{
printf ( "%c", chs[i] );
}
else
{
printf ( "." );
}
}
}
printf ( "\n" );
}
static int rows_eq ( int *a, int *b )
{
int i;
for ( i=0; i<16; i++ )
if ( a[i] != b[i] )
{
return 0;
}
return 1;
}
void amqp_dump ( void const *buffer, size_t len )
{
unsigned char *buf = ( unsigned char * ) buffer;
long count = 0;
int numinrow = 0;
int chs[16];
int oldchs[16] = {0};
int showed_dots = 0;
size_t i;
for ( i = 0; i < len; i++ )
{
int ch = buf[i];
if ( numinrow == 16 )
{
int i;
if ( rows_eq ( oldchs, chs ) )
{
if ( !showed_dots )
{
showed_dots = 1;
printf ( " .. .. .. .. .. .. .. .. : .. .. .. .. .. .. .. ..\n" );
}
}
else
{
showed_dots = 0;
dump_row ( count, numinrow, chs );
}
for ( i=0; i<16; i++ )
{
oldchs[i] = chs[i];
}
numinrow = 0;
}
count++;
chs[numinrow++] = ch;
}
dump_row ( count, numinrow, chs );
if ( numinrow != 0 )
{
printf ( "%08lX:\n", count );
}
}

13
c-demo-agent/utils.h Normal file
View File

@ -0,0 +1,13 @@
#ifndef agent_utils_h
#define agent_utils_h
void die(const char *fmt, ...);
extern void die_on_error(int x, char const *context);
extern void die_on_amqp_error(amqp_rpc_reply_t x, char const *context);
extern void amqp_dump(void const *buffer, size_t len);
extern uint64_t now_microseconds(void);
extern void microsleep(int usec);
#endif