Dynamic bridge - multi pattern (solution)
Hello,
There is some limitations in bridge json format, because code don't allow to add multi pattern in json format. So I changed bridge_dynamic.c file (bridge__dynamic_parse_payload_new_json function) to allow add multi pattern to bridge. Additionally I wrote functionality to allow specify local_clientid and remote_clientid by json (also in normal parsing - not json). You can check my code and consider merge this functionalities with dynamic-bridge branch.
The json format after changes: mosquitto_pub -h 127.0.0.1 -p 1883 -t '$BRIDGE/new' -m '{"bridges":[{"connection":"testBridge","addresses":[{"address":"127.0.0.1","port":1884}],"topics":["topic":"#","direction":"both","qos":0,"local_prefix":"test/1883/","remote_prefix":"test/1884/"}],"local_clientid":"1883","remote_clientid":"1884"]}'
int bridge__dynamic_parse_payload_new_json(struct mosquitto_db *db, void* payload, struct mosquitto__config *config)
{
int i;
int len;
struct mosquitto__bridge *cur_bridge = NULL;
struct mosquitto__bridge_topic *cur_topic;
#ifndef WITH_CJSON
return bridge__dynamic_parse_payload_new(db, payload, config);
#endif
cJSON *message_json = cJSON_Parse(payload);
if(message_json == NULL){
const char *error_ptr = cJSON_GetErrorPtr();
if(error_ptr != NULL){
log__printf(NULL, MOSQ_LOG_WARNING, "Warning: Unable to parse JSON Message for bridge dynamic. Maybe normal message configuration. %s", error_ptr);
}
cJSON_Delete(message_json);
return bridge__dynamic_parse_payload_new(db, payload, config);
}
const cJSON *bridges_json = NULL;
const cJSON *addresses_json = NULL;
int bridges_count_json = 0;
int addresses_count_json = 0;
bridges_json = cJSON_GetObjectItemCaseSensitive(message_json, "bridges");
if(!cJSON_IsArray(bridges_json)) {
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
cJSON_Delete(message_json);
return MOSQ_ERR_INVAL;
}
bridges_count_json = cJSON_GetArraySize(bridges_json);
log__printf(NULL, MOSQ_LOG_DEBUG, "Information : %d bridge(s) dynamic.", bridges_count_json);
if(bridges_count_json == 0){
log__printf(NULL, MOSQ_LOG_ERR, "Error: None Bridge in configuration.");
cJSON_Delete(message_json);
return MOSQ_ERR_INVAL;
}
const cJSON *bridge_json = NULL;
// Actually, just one bridge defined in configuration bridges. Work in progress...
int index = 0;
bridge_json = cJSON_GetArrayItem(bridges_json, index);
const cJSON *connection_json = NULL;
const cJSON *topics_json = NULL;
const cJSON *remote_username = NULL;
const cJSON *try_private = NULL;
const cJSON *notification_topic = NULL;
const cJSON *local_clientid = NULL;
const cJSON *remote_clientid = NULL;
int topics_count_json = 0;
connection_json = cJSON_GetObjectItemCaseSensitive(bridge_json, "connection");
if(cJSON_IsString(connection_json) && (connection_json->valuestring != NULL)) {
/* Check for existing bridge name. */
for(i=0; i<db->bridge_count; i++){
if(!strcmp(db->bridges[i]->bridge->name, connection_json->valuestring)){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Duplicate bridge name \"%s\".", connection_json->valuestring);
cJSON_Delete(message_json);
return MOSQ_ERR_INVAL;
}
}
config->bridge_count++;
config->bridges = mosquitto__realloc(config->bridges, (size_t)config->bridge_count*sizeof(struct mosquitto__bridge));
if(!config->bridges){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
cJSON_Delete(message_json);
return MOSQ_ERR_NOMEM;
}
cur_bridge = &(config->bridges[config->bridge_count-1]);
memset(cur_bridge, 0, sizeof(struct mosquitto__bridge));
cur_bridge->name = mosquitto__strdup(connection_json->valuestring);
if(!cur_bridge->name){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
cJSON_Delete(message_json);
return MOSQ_ERR_NOMEM;
}
cur_bridge->keepalive = 60;
cur_bridge->notifications = true;
cur_bridge->notifications_local_only = false;
cur_bridge->start_type = bst_automatic;
cur_bridge->idle_timeout = 60;
cur_bridge->restart_timeout = 0;
cur_bridge->backoff_base = 5;
cur_bridge->backoff_cap = 30;
cur_bridge->threshold = 10;
cur_bridge->try_private = true;
cur_bridge->attempt_unsubscribe = true;
cur_bridge->protocol_version = mosq_p_mqtt311;
cur_bridge->primary_retry_sock = INVALID_SOCKET;
}
addresses_json = cJSON_GetObjectItemCaseSensitive(bridge_json, "addresses");
if(!cJSON_IsArray(addresses_json)) {
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration (addresses).");
cJSON_Delete(message_json);
return MOSQ_ERR_INVAL;
}
addresses_count_json = cJSON_GetArraySize(addresses_json);
if(addresses_count_json == 0){
log__printf(NULL, MOSQ_LOG_ERR, "Error: None address in bridge configuration.");
cJSON_Delete(message_json);
return MOSQ_ERR_INVAL;
}
const cJSON *addressItem_json = NULL;
cJSON_ArrayForEach(addressItem_json, addresses_json) {
cJSON *address_json = cJSON_GetObjectItemCaseSensitive(addressItem_json, "address");
cJSON *port_json = cJSON_GetObjectItemCaseSensitive(addressItem_json, "port");
if(cJSON_IsString(address_json) && (address_json->valuestring != NULL)) {
if(!cur_bridge || cur_bridge->addresses){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
cJSON_Delete(message_json);
return MOSQ_ERR_INVAL;
}
cur_bridge->address_count++;
cur_bridge->addresses = mosquitto__realloc(cur_bridge->addresses, (size_t)cur_bridge->address_count*sizeof(struct bridge_address));
if(!cur_bridge->addresses){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
cJSON_Delete(message_json);
return MOSQ_ERR_NOMEM;
}
cur_bridge->addresses[cur_bridge->address_count-1].address = mosquitto__strdup(address_json->valuestring);
}
if(cJSON_IsNumber(port_json)){
if(port_json->valueint < 1 || port_json->valueint > UINT16_MAX){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid port value (%d).", port_json->valueint);
cJSON_Delete(message_json);
return MOSQ_ERR_INVAL;
}
cur_bridge->addresses[cur_bridge->address_count-1].port = (uint16_t)port_json->valueint;
}
}
topics_json = cJSON_GetObjectItemCaseSensitive(bridge_json, "topics");
if(!cJSON_IsArray(addresses_json)) {
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration (topics).");
cJSON_Delete(message_json);
return MOSQ_ERR_INVAL;
}
topics_count_json = cJSON_GetArraySize(topics_json);
if(topics_count_json == 0){
log__printf(NULL, MOSQ_LOG_ERR, "Error: None topic in bridge configuration.");
cJSON_Delete(message_json);
return MOSQ_ERR_INVAL;
}
const cJSON *topicItem_json = NULL;
cJSON_ArrayForEach(topicItem_json, topics_json) {
cJSON *topic_json = cJSON_GetObjectItemCaseSensitive(topicItem_json, "topic");
cJSON *direction_json = cJSON_GetObjectItemCaseSensitive(topicItem_json, "direction");
cJSON *qos_json = cJSON_GetObjectItemCaseSensitive(topicItem_json, "qos");
cJSON *local_prefix_json = cJSON_GetObjectItemCaseSensitive(topicItem_json, "local_prefix");
cJSON *remote_prefix_json = cJSON_GetObjectItemCaseSensitive(topicItem_json, "remote_prefix");
if(cJSON_IsString(topic_json) && (topic_json->valuestring != NULL)) {
cur_bridge->topic_count++;
cur_bridge->topics = mosquitto__realloc(cur_bridge->topics,
sizeof(struct mosquitto__bridge_topic)*(size_t)cur_bridge->topic_count);
if(!cur_bridge->topics){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
cJSON_Delete(message_json);
return MOSQ_ERR_NOMEM;
}
cur_topic = &cur_bridge->topics[cur_bridge->topic_count-1];
if(!strcmp(topic_json->valuestring, "\"\"")){
cur_topic->topic = NULL;
}else{
cur_topic->topic = mosquitto__strdup(topic_json->valuestring);
if(!cur_topic->topic){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
cJSON_Delete(message_json);
return MOSQ_ERR_NOMEM;
}
}
cur_topic->direction = bd_out;
cur_topic->qos = 0;
cur_topic->local_prefix = NULL;
cur_topic->remote_prefix = NULL;
}else{
log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty topic value in configuration.");
cJSON_Delete(message_json);
return MOSQ_ERR_INVAL;
}
if(cJSON_IsString(direction_json) && (direction_json->valuestring != NULL)) {
if(!strcasecmp(direction_json->valuestring, "out")){
cur_topic->direction = bd_out;
}else if(!strcasecmp(direction_json->valuestring, "in")){
cur_topic->direction = bd_in;
}else if(!strcasecmp(direction_json->valuestring, "both")){
cur_topic->direction = bd_both;
}else{
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge topic direction '%s'.", direction_json->valuestring);
cJSON_Delete(message_json);
return MOSQ_ERR_INVAL;
}
}
if(cJSON_IsNumber(qos_json)){
if(qos_json->valueint < 0 || qos_json->valueint > 2){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge QoS level '%d'.", qos_json->valueint);
cJSON_Delete(message_json);
return MOSQ_ERR_INVAL;
}
cur_topic->qos = (uint8_t)qos_json->valueint;
}
if(cJSON_IsString(local_prefix_json) && (local_prefix_json->valuestring != NULL)) {
cur_bridge->topic_remapping = true;
if(!strcmp(local_prefix_json->valuestring, "\"\"")){
cur_topic->local_prefix = NULL;
}else{
if(mosquitto_pub_topic_check(local_prefix_json->valuestring) != MOSQ_ERR_SUCCESS){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge topic local prefix '%s'.", local_prefix_json->valuestring);
cJSON_Delete(message_json);
return MOSQ_ERR_INVAL;
}
cur_topic->local_prefix = mosquitto__strdup(local_prefix_json->valuestring);
if(!cur_topic->local_prefix){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
cJSON_Delete(message_json);
return MOSQ_ERR_NOMEM;
}
}
}
if(cJSON_IsString(remote_prefix_json) && (remote_prefix_json->valuestring != NULL)) {
if(!strcmp(remote_prefix_json->valuestring, "\"\"")){
cur_topic->remote_prefix = NULL;
}else{
if(mosquitto_pub_topic_check(remote_prefix_json->valuestring) != MOSQ_ERR_SUCCESS){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge topic remote prefix '%s'.", remote_prefix_json->valuestring);
cJSON_Delete(message_json);
return MOSQ_ERR_INVAL;
}
cur_topic->remote_prefix = mosquitto__strdup(remote_prefix_json->valuestring);
if(!cur_topic->remote_prefix){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
cJSON_Delete(message_json);
return MOSQ_ERR_NOMEM;
}
}
}
if(cur_topic->topic == NULL &&
(cur_topic->local_prefix == NULL || cur_topic->remote_prefix == NULL)){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge remapping.");
cJSON_Delete(message_json);
return MOSQ_ERR_INVAL;
}
if(cur_topic->local_prefix){
if(cur_topic->topic){
len = (int)strlen(cur_topic->topic) + (int)strlen(cur_topic->local_prefix)+1;
cur_topic->local_topic = mosquitto__malloc((size_t)len+1);
if(!cur_topic->local_topic){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
cJSON_Delete(message_json);
return MOSQ_ERR_NOMEM;
}
snprintf(cur_topic->local_topic, (size_t)len+1, "%s%s", cur_topic->local_prefix, cur_topic->topic);
cur_topic->local_topic[len] = '\0';
}else{
cur_topic->local_topic = mosquitto__strdup(cur_topic->local_prefix);
if(!cur_topic->local_topic){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
cJSON_Delete(message_json);
return MOSQ_ERR_NOMEM;
}
}
}else{
cur_topic->local_topic = mosquitto__strdup(cur_topic->topic);
if(!cur_topic->local_topic){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
cJSON_Delete(message_json);
return MOSQ_ERR_NOMEM;
}
}
if(cur_topic->remote_prefix){
if(cur_topic->topic){
len = (int)strlen(cur_topic->topic) + (int)strlen(cur_topic->remote_prefix)+1;
cur_topic->remote_topic = mosquitto__malloc((size_t)len+1);
if(!cur_topic->remote_topic){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
cJSON_Delete(message_json);
return MOSQ_ERR_NOMEM;
}
snprintf(cur_topic->remote_topic, (size_t)len, "%s%s", cur_topic->remote_prefix, cur_topic->topic);
cur_topic->remote_topic[len] = '\0';
}else{
cur_topic->remote_topic = mosquitto__strdup(cur_topic->remote_prefix);
if(!cur_topic->remote_topic){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
cJSON_Delete(message_json);
return MOSQ_ERR_NOMEM;
}
}
}else{
cur_topic->remote_topic = mosquitto__strdup(cur_topic->topic);
if(!cur_topic->remote_topic){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
cJSON_Delete(message_json);
return MOSQ_ERR_NOMEM;
}
}
}
remote_username = cJSON_GetObjectItemCaseSensitive(bridge_json, "remote_username");
if(cJSON_IsString(remote_username) && (remote_username->valuestring != NULL)) {
if(!strcmp(remote_username->valuestring, "\"\"")){
cur_bridge->remote_username = NULL;
}else{
cur_bridge->remote_username = mosquitto__strdup(remote_username->valuestring);
if(!cur_bridge->remote_username){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
cJSON_Delete(message_json);
return MOSQ_ERR_NOMEM;
}
}
}
try_private = cJSON_GetObjectItemCaseSensitive(bridge_json, "try_private");
if(cJSON_IsBool(try_private)){
cur_bridge->try_private = cJSON_IsTrue(try_private) ? true : false;
}
notification_topic = cJSON_GetObjectItemCaseSensitive(bridge_json, "notification_topic");
if(cJSON_IsString(notification_topic) && (notification_topic->valuestring != NULL)) {
if(!strcmp(notification_topic->valuestring, "\"\"")){
cur_bridge->notification_topic = NULL;
}else{
cur_bridge->notification_topic = mosquitto__strdup(notification_topic->valuestring);
if(!cur_bridge->notification_topic){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
cJSON_Delete(message_json);
return MOSQ_ERR_NOMEM;
}
}
}
local_clientid = cJSON_GetObjectItemCaseSensitive(bridge_json, "local_clientid");
if(cJSON_IsString(local_clientid) && (local_clientid->valuestring != NULL)) {
if(!strcmp(local_clientid->valuestring, "\"\"")){
cur_bridge->local_clientid = NULL;
}else{
cur_bridge->local_clientid = mosquitto__strdup(local_clientid->valuestring);
if(!cur_bridge->local_clientid){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
cJSON_Delete(message_json);
return MOSQ_ERR_NOMEM;
}
}
}
remote_clientid = cJSON_GetObjectItemCaseSensitive(bridge_json, "remote_clientid");
if(cJSON_IsString(remote_clientid) && (remote_clientid->valuestring != NULL)) {
if(!strcmp(remote_clientid->valuestring, "\"\"")){
cur_bridge->remote_clientid = NULL;
}else{
cur_bridge->remote_clientid = mosquitto__strdup(remote_clientid->valuestring);
if(!cur_bridge->remote_clientid){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
cJSON_Delete(message_json);
return MOSQ_ERR_NOMEM;
}
}
}
//Last verification
if(cur_bridge->address_count == 0){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty address value in configuration.");
cJSON_Delete(message_json);
return MOSQ_ERR_INVAL;
}
if(cur_bridge->topic_count == 0){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty topics value in configuration.");
cJSON_Delete(message_json);
return MOSQ_ERR_INVAL;
}
if(config->bridge_count == 0){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty connection value in configuration.");
cJSON_Delete(message_json);
return MOSQ_ERR_INVAL;
}
cJSON_Delete(message_json);
return MOSQ_ERR_SUCCESS;
}
int bridge__dynamic_parse_payload_new(struct mosquitto_db *db, void* payload, struct mosquitto__config *config)
{
char *buf = NULL;
char *token;
int tmp_int;
char *saveptr = NULL;
struct mosquitto__bridge *cur_bridge = NULL;
struct mosquitto__bridge_topic *cur_topic;
char *address;
int i;
int len;
int nb_param = 0;
if(!payload) return MOSQ_ERR_INVAL;
buf = strtok(payload, "\n");
while(buf) {
if(buf[0] != '#' && buf[0] != 10 && buf[0] != 13){
while(buf[strlen(buf)-1] == 10 || buf[strlen(buf)-1] == 13){
buf[strlen(buf)-1] = 0;
}
token = strtok_r(buf, " ", &saveptr);
if(token)
{
if(!strcmp(token, "address") || !strcmp(token, "addresses"))
{
nb_param ++;
if(!cur_bridge || cur_bridge->addresses){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
}
while((token = strtok_r(NULL, " ", &saveptr))){
cur_bridge->address_count++;
cur_bridge->addresses = mosquitto__realloc(cur_bridge->addresses, sizeof(struct bridge_address)*(size_t)cur_bridge->address_count);
if(!cur_bridge->addresses){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
return MOSQ_ERR_NOMEM;
}
cur_bridge->addresses[cur_bridge->address_count-1].address = token;
}
for(i=0; i<cur_bridge->address_count; i++){
address = strtok_r(cur_bridge->addresses[i].address, ":", &saveptr);
if(address){
token = strtok_r(NULL, ":", &saveptr);
if(token){
tmp_int = atoi(token);
if(tmp_int < 1 || tmp_int > UINT16_MAX){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid port value (%d).", tmp_int);
return MOSQ_ERR_INVAL;
}
cur_bridge->addresses[i].port = (uint16_t)tmp_int;
}else{
cur_bridge->addresses[i].port = 1883;
}
cur_bridge->addresses[i].address = mosquitto__strdup(address);
//_conf_attempt_resolve(address, "bridge address", MOSQ_LOG_WARNING, "Warning");
}
}
if(cur_bridge->address_count == 0){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty address value in configuration.");
return MOSQ_ERR_INVAL;
}
}
else if(!strcmp(token, "connection"))
{
nb_param ++;
//if(reload) continue; // FIXME
token = strtok_r(NULL, " ", &saveptr);
if(token){
/* Check for existing bridge name. */
for(i=0; i<db->bridge_count; i++){
if(!strcmp(db->bridges[i]->bridge->name, token)){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Duplicate bridge name \"%s\".", token);
return MOSQ_ERR_INVAL;
}
}
config->bridge_count++;
config->bridges = mosquitto__realloc(config->bridges, (size_t)config->bridge_count*sizeof(struct mosquitto__bridge));
if(!config->bridges){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
return MOSQ_ERR_NOMEM;
}
cur_bridge = &(config->bridges[config->bridge_count-1]);
memset(cur_bridge, 0, sizeof(struct mosquitto__bridge));
cur_bridge->name = mosquitto__strdup(token);
if(!cur_bridge->name){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
return MOSQ_ERR_NOMEM;
}
cur_bridge->keepalive = 60;
cur_bridge->notifications = true;
cur_bridge->notifications_local_only = false;
cur_bridge->start_type = bst_automatic;
cur_bridge->idle_timeout = 60;
cur_bridge->restart_timeout = 0;
cur_bridge->backoff_base = 5;
cur_bridge->backoff_cap = 30;
cur_bridge->threshold = 10;
cur_bridge->try_private = true;
cur_bridge->attempt_unsubscribe = true;
cur_bridge->protocol_version = mosq_p_mqtt311;
cur_bridge->primary_retry_sock = INVALID_SOCKET;
}else{
log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty connection value in configuration.");
return MOSQ_ERR_INVAL;
}
}
else if(!strcmp(token, "try_private"))
{
nb_param ++;
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
}
token = strtok_r(NULL, " ", &saveptr);
if(token){
if(!strcmp(token,"false")){
cur_bridge->try_private = false;
}else if(strcmp(token,"true")){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
}
}
}
else if(!strcmp(token, "notification_topic"))
{
nb_param ++;
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
}
token = strtok_r(NULL, " ", &saveptr);
if(token){
cur_bridge->notification_topic = mosquitto__strdup(token);
}
}
else if(!strcmp(token, "remote_username"))
{
log__printf(NULL, MOSQ_LOG_INFO, "Found remote_username");
nb_param ++;
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
}
token = strtok_r(NULL, " ", &saveptr);
if(token){
cur_bridge->remote_username = mosquitto__strdup(token);
}
}
else if(!strcmp(token, "local_clientid"))
{
nb_param ++;
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
}
token = strtok_r(NULL, " ", &saveptr);
if(token){
cur_bridge->local_clientid = mosquitto__strdup(token);
}
}
else if(!strcmp(token, "remote_clientid"))
{
nb_param ++;
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
}
token = strtok_r(NULL, " ", &saveptr);
if(token){
cur_bridge->remote_clientid = mosquitto__strdup(token);
}
}
else if(!strcmp(token, "topic"))
{
nb_param ++;
if(!cur_bridge){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
return MOSQ_ERR_INVAL;
}
token = strtok_r(NULL, " ", &saveptr);
if(token){
cur_bridge->topic_count++;
cur_bridge->topics = mosquitto__realloc(cur_bridge->topics,
sizeof(struct mosquitto__bridge_topic)*(size_t)cur_bridge->topic_count);
if(!cur_bridge->topics){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
return MOSQ_ERR_NOMEM;
}
cur_topic = &cur_bridge->topics[cur_bridge->topic_count-1];
if(!strcmp(token, "\"\"")){
cur_topic->topic = NULL;
}else{
cur_topic->topic = mosquitto__strdup(token);
if(!cur_topic->topic){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
return MOSQ_ERR_NOMEM;
}
}
cur_topic->direction = bd_out;
cur_topic->qos = 0;
cur_topic->local_prefix = NULL;
cur_topic->remote_prefix = NULL;
}else{
log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty topic value in configuration.");
return MOSQ_ERR_INVAL;
}
token = strtok_r(NULL, " ", &saveptr);
if(token){
if(!strcasecmp(token, "out")){
cur_topic->direction = bd_out;
}else if(!strcasecmp(token, "in")){
cur_topic->direction = bd_in;
}else if(!strcasecmp(token, "both")){
cur_topic->direction = bd_both;
}else{
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge topic direction '%s'.", token);
return MOSQ_ERR_INVAL;
}
token = strtok_r(NULL, " ", &saveptr);
if(token){
cur_topic->qos = (uint8_t)atoi(token);
if(cur_topic->qos < 0 || cur_topic->qos > 2){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge QoS level '%s'.", token);
return MOSQ_ERR_INVAL;
}
token = strtok_r(NULL, " ", &saveptr);
if(token){
cur_bridge->topic_remapping = true;
if(!strcmp(token, "\"\"")){
cur_topic->local_prefix = NULL;
}else{
if(mosquitto_pub_topic_check(token) != MOSQ_ERR_SUCCESS){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge topic local prefix '%s'.", token);
return MOSQ_ERR_INVAL;
}
cur_topic->local_prefix = mosquitto__strdup(token);
if(!cur_topic->local_prefix){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
return MOSQ_ERR_NOMEM;
}
}
token = strtok_r(NULL, " ", &saveptr);
if(token){
if(!strcmp(token, "\"\"")){
cur_topic->remote_prefix = NULL;
}else{
if(mosquitto_pub_topic_check(token) != MOSQ_ERR_SUCCESS){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge topic remote prefix '%s'.", token);
return MOSQ_ERR_INVAL;
}
cur_topic->remote_prefix = mosquitto__strdup(token);
if(!cur_topic->remote_prefix){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
return MOSQ_ERR_NOMEM;
}
}
}
}
}
}
if(cur_topic->topic == NULL &&
(cur_topic->local_prefix == NULL || cur_topic->remote_prefix == NULL)){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge remapping.");
return MOSQ_ERR_INVAL;
}
if(cur_topic->local_prefix){
if(cur_topic->topic){
len = (int)strlen(cur_topic->topic) + (int)strlen(cur_topic->local_prefix)+1;
cur_topic->local_topic = mosquitto__malloc((size_t)len+1);
if(!cur_topic->local_topic){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
return MOSQ_ERR_NOMEM;
}
snprintf(cur_topic->local_topic, (size_t)len+1, "%s%s", cur_topic->local_prefix, cur_topic->topic);
cur_topic->local_topic[len] = '\0';
}else{
cur_topic->local_topic = mosquitto__strdup(cur_topic->local_prefix);
if(!cur_topic->local_topic){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
return MOSQ_ERR_NOMEM;
}
}
}else{
cur_topic->local_topic = mosquitto__strdup(cur_topic->topic);
if(!cur_topic->local_topic){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
return MOSQ_ERR_NOMEM;
}
}
if(cur_topic->remote_prefix){
if(cur_topic->topic){
len = (int)strlen(cur_topic->topic) + (int)strlen(cur_topic->remote_prefix)+1;
cur_topic->remote_topic = mosquitto__malloc((size_t)len+1);
if(!cur_topic->remote_topic){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
return MOSQ_ERR_NOMEM;
}
snprintf(cur_topic->remote_topic, (size_t)len, "%s%s", cur_topic->remote_prefix, cur_topic->topic);
cur_topic->remote_topic[len] = '\0';
}else{
cur_topic->remote_topic = mosquitto__strdup(cur_topic->remote_prefix);
if(!cur_topic->remote_topic){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
return MOSQ_ERR_NOMEM;
}
}
}else{
cur_topic->remote_topic = mosquitto__strdup(cur_topic->topic);
if(!cur_topic->remote_topic){
log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
return MOSQ_ERR_NOMEM;
}
}
}
}
}
buf = strtok(NULL, "\n");
}
if(nb_param>=3){
return MOSQ_ERR_SUCCESS;
}else{
return MOSQ_ERR_INVAL;
}
}