mosquitto icon indicating copy to clipboard operation
mosquitto copied to clipboard

Dynamic bridge - multi pattern (solution)

Open de-wu opened this issue 3 years ago • 0 comments

Hello,

There is some limitations in bridge json format, because code don't allow to add multi pattern in json format. So I changed bridge_dynamic.c file (bridge__dynamic_parse_payload_new_json function) to allow add multi pattern to bridge. Additionally I wrote functionality to allow specify local_clientid and remote_clientid by json (also in normal parsing - not json). You can check my code and consider merge this functionalities with dynamic-bridge branch.

The json format after changes: mosquitto_pub -h 127.0.0.1 -p 1883 -t '$BRIDGE/new' -m '{"bridges":[{"connection":"testBridge","addresses":[{"address":"127.0.0.1","port":1884}],"topics":["topic":"#","direction":"both","qos":0,"local_prefix":"test/1883/","remote_prefix":"test/1884/"}],"local_clientid":"1883","remote_clientid":"1884"]}'

int bridge__dynamic_parse_payload_new_json(struct mosquitto_db *db, void* payload, struct mosquitto__config *config)
{
	int i;
	int len;
	struct mosquitto__bridge *cur_bridge = NULL;
	struct mosquitto__bridge_topic *cur_topic;

#ifndef WITH_CJSON
  return bridge__dynamic_parse_payload_new(db, payload, config);
#endif

	cJSON *message_json = cJSON_Parse(payload);
	if(message_json == NULL){
		const char *error_ptr = cJSON_GetErrorPtr();
		if(error_ptr != NULL){
			log__printf(NULL, MOSQ_LOG_WARNING, "Warning: Unable to parse JSON Message for bridge dynamic. Maybe normal message configuration. %s", error_ptr);
		}
		cJSON_Delete(message_json);
		return bridge__dynamic_parse_payload_new(db, payload, config);
	}

  const cJSON *bridges_json = NULL;
	const cJSON *addresses_json = NULL;
	int bridges_count_json = 0;
	int addresses_count_json = 0;

  bridges_json = cJSON_GetObjectItemCaseSensitive(message_json, "bridges");
	if(!cJSON_IsArray(bridges_json)) {
		log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
		cJSON_Delete(message_json);
		return MOSQ_ERR_INVAL;
	}
  bridges_count_json = cJSON_GetArraySize(bridges_json);
	log__printf(NULL, MOSQ_LOG_DEBUG, "Information : %d bridge(s) dynamic.", bridges_count_json);
	if(bridges_count_json == 0){
		log__printf(NULL, MOSQ_LOG_ERR, "Error: None Bridge in configuration.");
		cJSON_Delete(message_json);
		return MOSQ_ERR_INVAL;
	}
  const cJSON *bridge_json = NULL;
	// Actually, just one bridge defined in configuration bridges. Work in progress...
	int index = 0;
	bridge_json = cJSON_GetArrayItem(bridges_json, index);

	const cJSON *connection_json = NULL;
    const cJSON *topics_json = NULL;
	const cJSON *remote_username = NULL;
	const cJSON *try_private = NULL;
	const cJSON *notification_topic = NULL;
    const cJSON *local_clientid = NULL;
	const cJSON *remote_clientid = NULL;
    int topics_count_json = 0;

	connection_json = cJSON_GetObjectItemCaseSensitive(bridge_json, "connection");
	if(cJSON_IsString(connection_json) && (connection_json->valuestring != NULL)) {
		/* Check for existing bridge name. */
		for(i=0; i<db->bridge_count; i++){
			if(!strcmp(db->bridges[i]->bridge->name, connection_json->valuestring)){
				log__printf(NULL, MOSQ_LOG_ERR, "Error: Duplicate bridge name \"%s\".", connection_json->valuestring);
				cJSON_Delete(message_json);
				return MOSQ_ERR_INVAL;
			}
		}
		config->bridge_count++;
		config->bridges = mosquitto__realloc(config->bridges, (size_t)config->bridge_count*sizeof(struct mosquitto__bridge));
		if(!config->bridges){
			log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
			cJSON_Delete(message_json);
			return MOSQ_ERR_NOMEM;
		}
		cur_bridge = &(config->bridges[config->bridge_count-1]);
		memset(cur_bridge, 0, sizeof(struct mosquitto__bridge));
		cur_bridge->name = mosquitto__strdup(connection_json->valuestring);
		if(!cur_bridge->name){
			log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
			cJSON_Delete(message_json);
			return MOSQ_ERR_NOMEM;
		}
		cur_bridge->keepalive = 60;
		cur_bridge->notifications = true;
		cur_bridge->notifications_local_only = false;
		cur_bridge->start_type = bst_automatic;
		cur_bridge->idle_timeout = 60;
		cur_bridge->restart_timeout = 0;
		cur_bridge->backoff_base = 5;
		cur_bridge->backoff_cap = 30;
		cur_bridge->threshold = 10;
		cur_bridge->try_private = true;
		cur_bridge->attempt_unsubscribe = true;
		cur_bridge->protocol_version = mosq_p_mqtt311;
		cur_bridge->primary_retry_sock = INVALID_SOCKET;
	}

	addresses_json = cJSON_GetObjectItemCaseSensitive(bridge_json, "addresses");
	if(!cJSON_IsArray(addresses_json)) {
		log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration (addresses).");
		cJSON_Delete(message_json);
		return MOSQ_ERR_INVAL;
	}
	addresses_count_json = cJSON_GetArraySize(addresses_json);
	if(addresses_count_json == 0){
		log__printf(NULL, MOSQ_LOG_ERR, "Error: None address in bridge configuration.");
		cJSON_Delete(message_json);
		return MOSQ_ERR_INVAL;
	}
	const cJSON *addressItem_json = NULL;
	cJSON_ArrayForEach(addressItem_json, addresses_json) {
	  cJSON *address_json = cJSON_GetObjectItemCaseSensitive(addressItem_json, "address");
	  cJSON *port_json = cJSON_GetObjectItemCaseSensitive(addressItem_json, "port");

	  if(cJSON_IsString(address_json) && (address_json->valuestring != NULL)) {
	  	if(!cur_bridge || cur_bridge->addresses){
		  	log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
		  	cJSON_Delete(message_json);
				return MOSQ_ERR_INVAL;
		  }
		  cur_bridge->address_count++;
		  cur_bridge->addresses = mosquitto__realloc(cur_bridge->addresses, (size_t)cur_bridge->address_count*sizeof(struct bridge_address));
		  if(!cur_bridge->addresses){
		  	log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
		  	cJSON_Delete(message_json);
				return MOSQ_ERR_NOMEM;
		  }
		  cur_bridge->addresses[cur_bridge->address_count-1].address = mosquitto__strdup(address_json->valuestring);
  	}

	  if(cJSON_IsNumber(port_json)){
		  if(port_json->valueint < 1 || port_json->valueint > UINT16_MAX){
		  	log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid port value (%d).", port_json->valueint);
		  	cJSON_Delete(message_json);
				return MOSQ_ERR_INVAL;
	  	}
	  	cur_bridge->addresses[cur_bridge->address_count-1].port = (uint16_t)port_json->valueint;
	  }
  }

    topics_json = cJSON_GetObjectItemCaseSensitive(bridge_json, "topics");
    if(!cJSON_IsArray(addresses_json)) {
        log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration (topics).");
        cJSON_Delete(message_json);
        return MOSQ_ERR_INVAL;
    }

    topics_count_json = cJSON_GetArraySize(topics_json);
    if(topics_count_json == 0){
        log__printf(NULL, MOSQ_LOG_ERR, "Error: None topic in bridge configuration.");
        cJSON_Delete(message_json);
        return MOSQ_ERR_INVAL;
    }

    const cJSON *topicItem_json = NULL;
    cJSON_ArrayForEach(topicItem_json, topics_json) {
        cJSON *topic_json = cJSON_GetObjectItemCaseSensitive(topicItem_json, "topic");
        cJSON *direction_json = cJSON_GetObjectItemCaseSensitive(topicItem_json, "direction");
        cJSON *qos_json = cJSON_GetObjectItemCaseSensitive(topicItem_json, "qos");
        cJSON *local_prefix_json = cJSON_GetObjectItemCaseSensitive(topicItem_json, "local_prefix");
        cJSON *remote_prefix_json = cJSON_GetObjectItemCaseSensitive(topicItem_json, "remote_prefix");

        if(cJSON_IsString(topic_json) && (topic_json->valuestring != NULL)) {
            cur_bridge->topic_count++;
            cur_bridge->topics = mosquitto__realloc(cur_bridge->topics,
                                                    sizeof(struct mosquitto__bridge_topic)*(size_t)cur_bridge->topic_count);
            if(!cur_bridge->topics){
                log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
                cJSON_Delete(message_json);
                return MOSQ_ERR_NOMEM;
            }
            cur_topic = &cur_bridge->topics[cur_bridge->topic_count-1];
            if(!strcmp(topic_json->valuestring, "\"\"")){
                cur_topic->topic = NULL;
            }else{
                cur_topic->topic = mosquitto__strdup(topic_json->valuestring);
                if(!cur_topic->topic){
                    log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
                    cJSON_Delete(message_json);
                    return MOSQ_ERR_NOMEM;
                }
            }
            cur_topic->direction = bd_out;
            cur_topic->qos = 0;
            cur_topic->local_prefix = NULL;
            cur_topic->remote_prefix = NULL;
        }else{
            log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty topic value in configuration.");
            cJSON_Delete(message_json);
            return MOSQ_ERR_INVAL;
        }

        if(cJSON_IsString(direction_json) && (direction_json->valuestring != NULL)) {
            if(!strcasecmp(direction_json->valuestring, "out")){
                cur_topic->direction = bd_out;
            }else if(!strcasecmp(direction_json->valuestring, "in")){
                cur_topic->direction = bd_in;
            }else if(!strcasecmp(direction_json->valuestring, "both")){
                cur_topic->direction = bd_both;
            }else{
                log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge topic direction '%s'.", direction_json->valuestring);
                cJSON_Delete(message_json);
                return MOSQ_ERR_INVAL;
            }
        }

        if(cJSON_IsNumber(qos_json)){
            if(qos_json->valueint < 0 || qos_json->valueint > 2){
                log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge QoS level '%d'.", qos_json->valueint);
                cJSON_Delete(message_json);
                return MOSQ_ERR_INVAL;
            }
            cur_topic->qos = (uint8_t)qos_json->valueint;
        }

        if(cJSON_IsString(local_prefix_json) && (local_prefix_json->valuestring != NULL)) {
            cur_bridge->topic_remapping = true;
            if(!strcmp(local_prefix_json->valuestring, "\"\"")){
                cur_topic->local_prefix = NULL;
            }else{
                if(mosquitto_pub_topic_check(local_prefix_json->valuestring) != MOSQ_ERR_SUCCESS){
                    log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge topic local prefix '%s'.", local_prefix_json->valuestring);
                    cJSON_Delete(message_json);
                    return MOSQ_ERR_INVAL;
                }
                cur_topic->local_prefix = mosquitto__strdup(local_prefix_json->valuestring);
                if(!cur_topic->local_prefix){
                    log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
                    cJSON_Delete(message_json);
                    return MOSQ_ERR_NOMEM;
                }
            }
        }

        if(cJSON_IsString(remote_prefix_json) && (remote_prefix_json->valuestring != NULL)) {
            if(!strcmp(remote_prefix_json->valuestring, "\"\"")){
                cur_topic->remote_prefix = NULL;
            }else{
                if(mosquitto_pub_topic_check(remote_prefix_json->valuestring) != MOSQ_ERR_SUCCESS){
                    log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge topic remote prefix '%s'.", remote_prefix_json->valuestring);
                    cJSON_Delete(message_json);
                    return MOSQ_ERR_INVAL;
                }
                cur_topic->remote_prefix = mosquitto__strdup(remote_prefix_json->valuestring);
                if(!cur_topic->remote_prefix){
                    log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
                    cJSON_Delete(message_json);
                    return MOSQ_ERR_NOMEM;
                }
            }
        }

        if(cur_topic->topic == NULL &&
                (cur_topic->local_prefix == NULL || cur_topic->remote_prefix == NULL)){
            log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge remapping.");
            cJSON_Delete(message_json);
            return MOSQ_ERR_INVAL;
        }

        if(cur_topic->local_prefix){
            if(cur_topic->topic){
                len = (int)strlen(cur_topic->topic) + (int)strlen(cur_topic->local_prefix)+1;
                cur_topic->local_topic = mosquitto__malloc((size_t)len+1);
                if(!cur_topic->local_topic){
                    log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
                    cJSON_Delete(message_json);
                    return MOSQ_ERR_NOMEM;
                }
                snprintf(cur_topic->local_topic, (size_t)len+1, "%s%s", cur_topic->local_prefix, cur_topic->topic);
                cur_topic->local_topic[len] = '\0';
            }else{
                cur_topic->local_topic = mosquitto__strdup(cur_topic->local_prefix);
                if(!cur_topic->local_topic){
                    log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
                    cJSON_Delete(message_json);
                    return MOSQ_ERR_NOMEM;
                }
            }
        }else{
            cur_topic->local_topic = mosquitto__strdup(cur_topic->topic);
            if(!cur_topic->local_topic){
                log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
                cJSON_Delete(message_json);
                return MOSQ_ERR_NOMEM;
            }
        }

        if(cur_topic->remote_prefix){
            if(cur_topic->topic){
                len = (int)strlen(cur_topic->topic) + (int)strlen(cur_topic->remote_prefix)+1;
                cur_topic->remote_topic = mosquitto__malloc((size_t)len+1);
                if(!cur_topic->remote_topic){
                    log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
                    cJSON_Delete(message_json);
                    return MOSQ_ERR_NOMEM;
                }
                snprintf(cur_topic->remote_topic, (size_t)len, "%s%s", cur_topic->remote_prefix, cur_topic->topic);
                cur_topic->remote_topic[len] = '\0';
            }else{
                cur_topic->remote_topic = mosquitto__strdup(cur_topic->remote_prefix);
                if(!cur_topic->remote_topic){
                    log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
                    cJSON_Delete(message_json);
                    return MOSQ_ERR_NOMEM;
                }
            }
        }else{
            cur_topic->remote_topic = mosquitto__strdup(cur_topic->topic);
            if(!cur_topic->remote_topic){
                log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
                cJSON_Delete(message_json);
                return MOSQ_ERR_NOMEM;
            }
        }
    }
	remote_username = cJSON_GetObjectItemCaseSensitive(bridge_json, "remote_username");
	if(cJSON_IsString(remote_username) && (remote_username->valuestring != NULL)) {
		if(!strcmp(remote_username->valuestring, "\"\"")){
			cur_bridge->remote_username = NULL;
		}else{
			cur_bridge->remote_username = mosquitto__strdup(remote_username->valuestring);
			if(!cur_bridge->remote_username){
				log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
				cJSON_Delete(message_json);
				return MOSQ_ERR_NOMEM;
			}
		}
	}
	try_private = cJSON_GetObjectItemCaseSensitive(bridge_json, "try_private");
	if(cJSON_IsBool(try_private)){
		cur_bridge->try_private = cJSON_IsTrue(try_private) ? true : false;
	}
	notification_topic = cJSON_GetObjectItemCaseSensitive(bridge_json, "notification_topic");
	if(cJSON_IsString(notification_topic) && (notification_topic->valuestring != NULL)) {
		if(!strcmp(notification_topic->valuestring, "\"\"")){
			cur_bridge->notification_topic = NULL;
		}else{
			cur_bridge->notification_topic = mosquitto__strdup(notification_topic->valuestring);
			if(!cur_bridge->notification_topic){
				log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
				cJSON_Delete(message_json);
				return MOSQ_ERR_NOMEM;
			}
		}
	}
    local_clientid = cJSON_GetObjectItemCaseSensitive(bridge_json, "local_clientid");
    if(cJSON_IsString(local_clientid) && (local_clientid->valuestring != NULL)) {
        if(!strcmp(local_clientid->valuestring, "\"\"")){
            cur_bridge->local_clientid = NULL;
        }else{
            cur_bridge->local_clientid = mosquitto__strdup(local_clientid->valuestring);
            if(!cur_bridge->local_clientid){
                log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
                cJSON_Delete(message_json);
                return MOSQ_ERR_NOMEM;
            }
        }
    }
	remote_clientid = cJSON_GetObjectItemCaseSensitive(bridge_json, "remote_clientid");
	if(cJSON_IsString(remote_clientid) && (remote_clientid->valuestring != NULL)) {
		if(!strcmp(remote_clientid->valuestring, "\"\"")){
			cur_bridge->remote_clientid = NULL;
		}else{
			cur_bridge->remote_clientid = mosquitto__strdup(remote_clientid->valuestring);
			if(!cur_bridge->remote_clientid){
				log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
				cJSON_Delete(message_json);
				return MOSQ_ERR_NOMEM;
			}
		}
	}

  //Last verification
	if(cur_bridge->address_count == 0){
		log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty address value in configuration.");
		cJSON_Delete(message_json);
		return MOSQ_ERR_INVAL;
	}
    if(cur_bridge->topic_count == 0){
        log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty topics value in configuration.");
        cJSON_Delete(message_json);
        return MOSQ_ERR_INVAL;
    }
	if(config->bridge_count == 0){
		log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty connection value in configuration.");
		cJSON_Delete(message_json);
		return MOSQ_ERR_INVAL;
    }
  cJSON_Delete(message_json);
  return MOSQ_ERR_SUCCESS;
}
int bridge__dynamic_parse_payload_new(struct mosquitto_db *db, void* payload, struct mosquitto__config *config)
{
	char *buf = NULL;
	char *token;
	int tmp_int;
	char *saveptr = NULL;
	struct mosquitto__bridge *cur_bridge = NULL;
	struct mosquitto__bridge_topic *cur_topic;

	char *address;
	int i;
	int len;
	int nb_param = 0;

	if(!payload) return MOSQ_ERR_INVAL;

	buf = strtok(payload, "\n");

	while(buf) {
	   	if(buf[0] != '#' && buf[0] != 10 && buf[0] != 13){
			while(buf[strlen(buf)-1] == 10 || buf[strlen(buf)-1] == 13){
				buf[strlen(buf)-1] = 0;
			}
			token = strtok_r(buf, " ", &saveptr);

			if(token)
			{
				if(!strcmp(token, "address") || !strcmp(token, "addresses"))
				{
					nb_param ++;
					if(!cur_bridge || cur_bridge->addresses){
						log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
						return MOSQ_ERR_INVAL;
					}
					while((token = strtok_r(NULL, " ", &saveptr))){
						cur_bridge->address_count++;
						cur_bridge->addresses = mosquitto__realloc(cur_bridge->addresses, sizeof(struct bridge_address)*(size_t)cur_bridge->address_count);
						if(!cur_bridge->addresses){
							log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
							return MOSQ_ERR_NOMEM;
						}
						cur_bridge->addresses[cur_bridge->address_count-1].address = token;
					}
					for(i=0; i<cur_bridge->address_count; i++){
						address = strtok_r(cur_bridge->addresses[i].address, ":", &saveptr);
						if(address){
							token = strtok_r(NULL, ":", &saveptr);
							if(token){
								tmp_int = atoi(token);
								if(tmp_int < 1 || tmp_int > UINT16_MAX){
									log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid port value (%d).", tmp_int);
									return MOSQ_ERR_INVAL;
								}
								cur_bridge->addresses[i].port = (uint16_t)tmp_int;
							}else{
								cur_bridge->addresses[i].port = 1883;
							}
							cur_bridge->addresses[i].address = mosquitto__strdup(address);
							//_conf_attempt_resolve(address, "bridge address", MOSQ_LOG_WARNING, "Warning");
						}
					}
					if(cur_bridge->address_count == 0){
						log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty address value in configuration.");
						return MOSQ_ERR_INVAL;
					}
				}
				else if(!strcmp(token, "connection"))
				{
					nb_param ++;
					//if(reload) continue; // FIXME
					token = strtok_r(NULL, " ", &saveptr);
					if(token){
						/* Check for existing bridge name. */
						for(i=0; i<db->bridge_count; i++){
							if(!strcmp(db->bridges[i]->bridge->name, token)){
								log__printf(NULL, MOSQ_LOG_ERR, "Error: Duplicate bridge name \"%s\".", token);
								return MOSQ_ERR_INVAL;
							}
						}

						config->bridge_count++;
						config->bridges = mosquitto__realloc(config->bridges, (size_t)config->bridge_count*sizeof(struct mosquitto__bridge));
						if(!config->bridges){
							log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
							return MOSQ_ERR_NOMEM;
						}
						cur_bridge = &(config->bridges[config->bridge_count-1]);
						memset(cur_bridge, 0, sizeof(struct mosquitto__bridge));
						cur_bridge->name = mosquitto__strdup(token);
						if(!cur_bridge->name){
							log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
							return MOSQ_ERR_NOMEM;
						}
						cur_bridge->keepalive = 60;
						cur_bridge->notifications = true;
						cur_bridge->notifications_local_only = false;
						cur_bridge->start_type = bst_automatic;
						cur_bridge->idle_timeout = 60;
						cur_bridge->restart_timeout = 0;
						cur_bridge->backoff_base = 5;
						cur_bridge->backoff_cap = 30;
						cur_bridge->threshold = 10;
						cur_bridge->try_private = true;
						cur_bridge->attempt_unsubscribe = true;
						cur_bridge->protocol_version = mosq_p_mqtt311;
						cur_bridge->primary_retry_sock = INVALID_SOCKET;
					}else{
						log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty connection value in configuration.");
						return MOSQ_ERR_INVAL;
					}
				}
				else if(!strcmp(token, "try_private"))
				{
					nb_param ++;
					if(!cur_bridge){
						log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
						return MOSQ_ERR_INVAL;
					}
					token = strtok_r(NULL, " ", &saveptr);
					if(token){
						if(!strcmp(token,"false")){
							cur_bridge->try_private = false;
						}else if(strcmp(token,"true")){
							log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
							return MOSQ_ERR_INVAL;
						}
					}
				}
				else if(!strcmp(token, "notification_topic"))
				{
					nb_param ++;
					if(!cur_bridge){
						log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
						return MOSQ_ERR_INVAL;
					}
					token = strtok_r(NULL, " ", &saveptr);
					if(token){
						cur_bridge->notification_topic = mosquitto__strdup(token);
					}
				}
				else if(!strcmp(token, "remote_username"))
				{
					log__printf(NULL, MOSQ_LOG_INFO, "Found remote_username");
					nb_param ++;
					if(!cur_bridge){
						log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
						return MOSQ_ERR_INVAL;
					}
					token = strtok_r(NULL, " ", &saveptr);
					if(token){
						cur_bridge->remote_username = mosquitto__strdup(token);
					}
				}
                else if(!strcmp(token, "local_clientid"))
                {
                    nb_param ++;
                    if(!cur_bridge){
                        log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
                        return MOSQ_ERR_INVAL;
                    }
                    token = strtok_r(NULL, " ", &saveptr);
                    if(token){
                        cur_bridge->local_clientid = mosquitto__strdup(token);
                    }
                }
				else if(!strcmp(token, "remote_clientid"))
				{
					nb_param ++;
					if(!cur_bridge){
						log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
						return MOSQ_ERR_INVAL;
					}
					token = strtok_r(NULL, " ", &saveptr);
					if(token){
						cur_bridge->remote_clientid = mosquitto__strdup(token);
					}
				}
				else if(!strcmp(token, "topic"))
				{
					nb_param ++;
					if(!cur_bridge){
						log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge configuration.");
						return MOSQ_ERR_INVAL;
					}
					token = strtok_r(NULL, " ", &saveptr);
					if(token){
						cur_bridge->topic_count++;
						cur_bridge->topics = mosquitto__realloc(cur_bridge->topics,
								sizeof(struct mosquitto__bridge_topic)*(size_t)cur_bridge->topic_count);
						if(!cur_bridge->topics){
							log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
							return MOSQ_ERR_NOMEM;
						}
						cur_topic = &cur_bridge->topics[cur_bridge->topic_count-1];
						if(!strcmp(token, "\"\"")){
							cur_topic->topic = NULL;
						}else{
							cur_topic->topic = mosquitto__strdup(token);
							if(!cur_topic->topic){
								log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
								return MOSQ_ERR_NOMEM;
							}
						}
						cur_topic->direction = bd_out;
						cur_topic->qos = 0;
						cur_topic->local_prefix = NULL;
						cur_topic->remote_prefix = NULL;
					}else{
						log__printf(NULL, MOSQ_LOG_ERR, "Error: Empty topic value in configuration.");
						return MOSQ_ERR_INVAL;
					}
					token = strtok_r(NULL, " ", &saveptr);
					if(token){
						if(!strcasecmp(token, "out")){
							cur_topic->direction = bd_out;
						}else if(!strcasecmp(token, "in")){
							cur_topic->direction = bd_in;
						}else if(!strcasecmp(token, "both")){
							cur_topic->direction = bd_both;
						}else{
							log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge topic direction '%s'.", token);
							return MOSQ_ERR_INVAL;
						}
						token = strtok_r(NULL, " ", &saveptr);
						if(token){
							cur_topic->qos = (uint8_t)atoi(token);
							if(cur_topic->qos < 0 || cur_topic->qos > 2){
								log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge QoS level '%s'.", token);
								return MOSQ_ERR_INVAL;
							}

							token = strtok_r(NULL, " ", &saveptr);
							if(token){
								cur_bridge->topic_remapping = true;
								if(!strcmp(token, "\"\"")){
									cur_topic->local_prefix = NULL;
								}else{
									if(mosquitto_pub_topic_check(token) != MOSQ_ERR_SUCCESS){
										log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge topic local prefix '%s'.", token);
										return MOSQ_ERR_INVAL;
									}
									cur_topic->local_prefix = mosquitto__strdup(token);
									if(!cur_topic->local_prefix){
										log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
										return MOSQ_ERR_NOMEM;
									}
								}

								token = strtok_r(NULL, " ", &saveptr);
								if(token){
									if(!strcmp(token, "\"\"")){
										cur_topic->remote_prefix = NULL;
									}else{
										if(mosquitto_pub_topic_check(token) != MOSQ_ERR_SUCCESS){
											log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge topic remote prefix '%s'.", token);
											return MOSQ_ERR_INVAL;
										}
										cur_topic->remote_prefix = mosquitto__strdup(token);
										if(!cur_topic->remote_prefix){
											log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
											return MOSQ_ERR_NOMEM;
										}
									}
								}
							}
						}
					}
					if(cur_topic->topic == NULL &&
							(cur_topic->local_prefix == NULL || cur_topic->remote_prefix == NULL)){

						log__printf(NULL, MOSQ_LOG_ERR, "Error: Invalid bridge remapping.");
						return MOSQ_ERR_INVAL;
					}
					if(cur_topic->local_prefix){
						if(cur_topic->topic){
							len = (int)strlen(cur_topic->topic) + (int)strlen(cur_topic->local_prefix)+1;
							cur_topic->local_topic = mosquitto__malloc((size_t)len+1);
							if(!cur_topic->local_topic){
								log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
								return MOSQ_ERR_NOMEM;
							}
							snprintf(cur_topic->local_topic, (size_t)len+1, "%s%s", cur_topic->local_prefix, cur_topic->topic);
							cur_topic->local_topic[len] = '\0';
						}else{
							cur_topic->local_topic = mosquitto__strdup(cur_topic->local_prefix);
							if(!cur_topic->local_topic){
								log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
								return MOSQ_ERR_NOMEM;
							}
						}
					}else{
						cur_topic->local_topic = mosquitto__strdup(cur_topic->topic);
						if(!cur_topic->local_topic){
							log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
							return MOSQ_ERR_NOMEM;
						}
					}

					if(cur_topic->remote_prefix){
						if(cur_topic->topic){
							len = (int)strlen(cur_topic->topic) + (int)strlen(cur_topic->remote_prefix)+1;
							cur_topic->remote_topic = mosquitto__malloc((size_t)len+1);
							if(!cur_topic->remote_topic){
								log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
								return MOSQ_ERR_NOMEM;
							}
							snprintf(cur_topic->remote_topic, (size_t)len, "%s%s", cur_topic->remote_prefix, cur_topic->topic);
							cur_topic->remote_topic[len] = '\0';
						}else{
							cur_topic->remote_topic = mosquitto__strdup(cur_topic->remote_prefix);
							if(!cur_topic->remote_topic){
								log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
								return MOSQ_ERR_NOMEM;
							}
						}
					}else{
						cur_topic->remote_topic = mosquitto__strdup(cur_topic->topic);
						if(!cur_topic->remote_topic){
							log__printf(NULL, MOSQ_LOG_ERR, "Error: Out of memory.");
							return MOSQ_ERR_NOMEM;
						}
					}
				}
			}
		}
		buf  = strtok(NULL, "\n");
	}

	if(nb_param>=3){
		return MOSQ_ERR_SUCCESS;
	}else{
		return MOSQ_ERR_INVAL;
	}
}

de-wu avatar Sep 19 '22 15:09 de-wu