libpeer
libpeer copied to clipboard
solved timeout issue and allowed peer to peer using 2 esp32
I've modified to accept connections so work as callee I've created a stun heartbeat to preserve connection up changes are minimal I've tested this using datachannel to establish the datachannel I used this py code
import paho.mqtt.client as mqtt
import json
import time
# MQTT Broker Configuration
MQTT_BROKER = "192.168.50.127" # Change this to your MQTT broker address
MQTT_PORT = 1883 # Default MQTT port
MQTT_KEEPALIVE = 60
# MQTT Topics
CALLER_INVOKE_TOPIC = "/c76c2b3a-74ee-48f3-8fc2-7e51a1bd6a74/invoke"
CALLER_RESULT_TOPIC = "/c76c2b3a-74ee-48f3-8fc2-7e51a1bd6a74/result"
CALLEE_INVOKE_TOPIC = "/86e786f8-b6ec-446d-bf6a-2a715f61c0ad/invoke"
CALLEE_RESULT_TOPIC = "/86e786f8-b6ec-446d-bf6a-2a715f61c0ad/result"
# MQTT Client Setup
client = mqtt.Client()
# ID Mapping Storage
id_map = {} # Maps request IDs from caller to callee and vice versa
def on_message(client, userdata, msg):
"""Handles incoming MQTT messages."""
global id_map
try:
payload = json.loads(msg.payload.decode())
response_id = payload.get("id")
response_result = payload.get("result")
if msg.topic == CALLER_RESULT_TOPIC:
print(f"[CALLER RESULT] Received (ID {response_id}): {payload}")
# Map the caller request ID to a new ID for the callee
new_id = response_id + 1000 # Example mapping logic (avoid conflicts)
id_map[new_id] = response_id
# Send offer to callee
answer_request = {
"jsonrpc": "2.0",
"method": "answer",
"params": response_result,
"id": new_id
}
print(f"[CALLEE INVOKE] Forwarding offer to callee with new ID {new_id}")
client.publish(CALLEE_INVOKE_TOPIC, json.dumps(answer_request))
elif msg.topic == CALLEE_RESULT_TOPIC:
print(f"[CALLEE RESULT] Received (ID {response_id}): {payload}")
# Get original caller ID from the mapping
original_id = id_map.get(response_id, response_id)
# Send answer back to caller
final_answer = {
"jsonrpc": "2.0",
"method": "answer",
"params": response_result,
"id": original_id
}
print(f"[CALLER INVOKE] Forwarding answer to caller with original ID {original_id}")
client.publish(CALLER_INVOKE_TOPIC, json.dumps(final_answer))
# Clean up ID mapping
if response_id in id_map:
del id_map[response_id]
except Exception as e:
print(f"Error processing message on {msg.topic}: {e}")
def main():
global client
# Connect MQTT Callbacks
client.on_message = on_message
# Connect to MQTT Broker
client.connect(MQTT_BROKER, MQTT_PORT, MQTT_KEEPALIVE)
# Subscribe to result topics
client.subscribe(CALLER_RESULT_TOPIC)
client.subscribe(CALLEE_RESULT_TOPIC)
# Start MQTT loop in background thread
client.loop_start()
print("Press s and then enter to send offer request...")
while True:
try:
key = input()
if key == "s":
# Generate a unique ID for the offer request
request_id = int(time.time()) # Use current timestamp as a unique ID
id_map[request_id] = request_id # Store original mapping
# Send WebRTC Offer Request to Caller
offer_request = {
"jsonrpc": "2.0",
"method": "offer",
"params": "",
"id": request_id
}
print(f"[CALLER INVOKE] Sending offer request with ID {request_id}")
client.publish(CALLER_INVOKE_TOPIC, json.dumps(offer_request))
except KeyboardInterrupt:
break
client.loop_stop()
if __name__ == "__main__":
main()