Unity3D.Amqp
Unity3D.Amqp copied to clipboard
How would i send messages to an exchange?
I basically want to be able to send messages to the queue or exchange that goes into moving the cubes.
I have the following code written for a c# console app but how would i convert this to your unity code?
foreach (string greenhouselist in Greenhouse.GreenhouseList)
{
channel.QueueDeclare($"{greenhouselist}Queue",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
//channel.ExchangeDeclare("amq.topic", ExchangeType.Fanout);
var Count = 0;
foreach (string Sensor in Sensor.SensorList)
{
//var message = new { Name = "Plant Event", Location = greenhouselist, Sensor = Sensor, Saverity = "high", Message = $"Hello! Water me!! Count:{Count}" };
//var body = Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message));
//// string query = "INSERT INTO dbo.greenhouse.message (Name,Price,Date) VALUES('LED Screen','$120','27 January 2017')";
////SqlCommand cmd = new SqlCommand(query, con);
//channel.BasicPublish("", $"{greenhouselist}Queue", null, body);
foreach (string cube in cube.cubeList)
{
Random random = new Random();
//{ "id":"cube2" "rotY":45 }
//{ "id":"cube2", "rotY":45 }
//[{ "id":"cube1", "rotX":45 }, { "id":"cube2", "rotY":45 }, { "id":"cube3", "rotZ":45 }]
var message1 = @"{ ""id"":""cube2"", ""rotY"":45 }";
var message = @"[{ ""id"":""cube2"", ""rotY"":45 }, { ""id"":""cube1"", ""rotX"":90, ""rotY"":23 }, { ""id"":""cube3"", ""rotZ"":29 }]";
var message2 = System.Text.RegularExpressions.Regex.Replace(message, "[@\\.;'\\\\]", "");
//var message2 = message.Replace(@"\", string.Empty);
//var message = new { id = "Plant Event", rot = greenhouselist, Sensor = Sensor};
var body2 = Encoding.UTF8.GetBytes(message2);
channel.BasicPublish(exchange: "amq.topic",
routingKey: "amqpdemo.objects",
basicProperties: null,
body: body2);
Count++;
Thread.Sleep(1000);
}
Count++;
Thread.Sleep(1000);
}
}
}
`using System.Collections.Generic; using UnityEngine; using UnityEngine.UI; using CymaticLabs.Unity3D.Amqp.SimpleJSON;
namespace CymaticLabs.Unity3D.Amqp.UI
{
///
[Tooltip("The name of the exchange to subscribe to.")]
public string ExchangeName;
[Tooltip("The exchange type for the exchange being subscribed to. It is important to get this value correct as the RabbitMQ client will close a connection if you pass the wrong type for an already declared exchange.")]
public AmqpExchangeTypes ExchangeType = AmqpExchangeTypes.Topic;
[Tooltip("The optional routing key to use when subscribing to the exchange. This mostly applies to 'topic' exchanges.")]
public string RoutingKey;
[Tooltip("When enabled received messages will be logged to the debug console.")]
public bool DebugLogMessages = false;
#region Fields
// Internal look-up table of object references given their AMQP ID
private Dictionary<string, AmqpObjectControlReference> objectsById;
#endregion Fields
#region Properties
/// <summary>
/// Gets the static, singleton instance of the behaviour.
/// </summary>
public static SensorScript Instance { get; private set; }
#endregion Properties
#region Methods
private void Awake()
{
// Set static instance
Instance = this;
// Initialize the object/id look-up table
objectsById = new Dictionary<string, AmqpObjectControlReference>();
}
// *Note*: Only interact with the AMQP library in Start(), not Awake()
// since the AmqpClient initializes itself in Awake() and won't be ready yet.
private void Start()
{
// Create a new exchange subscription using the inspector values
var subscription = new AmqpExchangeSubscription(ExchangeName, ExchangeType, RoutingKey, HandleExchangeMessageReceived);
/*
* Add the subscription to the client. If you are using multiple AmqpClient instances then
* using the static methods won't work. In that case add a inspector property of type 'AmqpClient'
* and assigned a reference to the connection you want to work with and call the 'SubscribeToExchange()'
* non-static method instead.
*/
AmqpClient.Subscribe(subscription);
}
private void Update()
{
Publish();
}
public void Publish()
{
// Validate args
var isValid = true;
//var exchangeName = PublishExchange.options[PublishExchange.value].text;
var exchangeName = "amq.topic";
if (string.IsNullOrEmpty(exchangeName))
{
isValid = false;
AmqpConsole.Color = Color.red;
AmqpConsole.WriteLine("* Exchange Name cannot be blank");
AmqpConsole.Color = null;
}
var message = "insert some sort of message here probably json or xml";
if (string.IsNullOrEmpty(message))
{
isValid = false;
AmqpConsole.Color = Color.red;
AmqpConsole.WriteLine("* Message cannot be blank");
AmqpConsole.Color = null;
}
// Don't continue if values are invald
if (!isValid) return;
var exchangeType = AmqpExchangeTypes.Direct;
// Find this exchange and get its exchange type
foreach (var exchange in exchanges)
{
if (exchange.Name == exchangeName)
{
exchangeType = exchange.Type;
break;
}
}
var routingKey = "amqpdemo.objects";
// Publish the message
AmqpClient.Publish(exchangeName, routingKey, message);
//PublishMessage.text = null; // clear out message
// Refocus the message area
//PublishMessage.Select();
//PublishMessage.ActivateInputField();
}
/// <summary>
/// Registers a new AMQP object control reference with the controller.
/// </summary>
/// <param name="objRef">The object control reference to register.</param>
public void RegisterObject(AmqpObjectControlReference objRef)
{
if (objRef == null) throw new System.ArgumentNullException("objRef");
// Ensure this reference has been filled out properly
if (string.IsNullOrEmpty(objRef.AmqpId))
{
Debug.LogWarningFormat("AMQP Control Object Reference is missing its ID: {0}", objRef.name);
return;
}
// Add new
if (!objectsById.ContainsKey(objRef.AmqpId))
{
objectsById.Add(objRef.AmqpId, objRef);
}
// Replace, but warn
else
{
Debug.LogWarningFormat("AMQP Control Object Reference with ID has already been registered: {0}", objRef.AmqpId);
objectsById[objRef.AmqpId] = objRef;
}
if (DebugLogMessages)
Debug.LogFormat("AMQP Control Object registered with ID {0} => {1}", objRef.AmqpId, objRef.name);
}
/// <summary>
/// unregisters an existing AMQP object control reference from the controller.
/// </summary>
/// <param name="objRef">The object control reference to unregister.</param>
public void UnregisterObject(AmqpObjectControlReference objRef)
{
if (objRef == null) throw new System.ArgumentNullException("objRef");
// Ensure this reference has been filled out properly
if (string.IsNullOrEmpty(objRef.AmqpId))
{
Debug.LogWarningFormat("AMQP Control Object Reference is missing its ID: {0}", objRef.name);
return;
}
if (objectsById.ContainsKey(objRef.AmqpId))
{
objectsById.Remove(objRef.AmqpId);
if (DebugLogMessages) Debug.LogFormat("AMQP Control Object Reference unregistere {0}", objRef.AmqpId);
}
}
/**
* Handles messages receieved from this object's subscription based on the exchange name,
* exchange type, and routing key used. You could also write an anonymous delegate in line
* when creating the subscription like: (received) => { Debug.Log(received.Message.Body.Length); }
*/
void HandleExchangeMessageReceived(AmqpExchangeReceivedMessage received)
{
// First convert the message's body, which is a byte array, into a string for parsing the JSON
var receivedJson = System.Text.Encoding.UTF8.GetString(received.Message.Body);
Debug.Log(receivedJson);
/**
* Parse the JSON message
* This example uses the SimpleJSON parser which is included in the AMQP library.
* You can find out more about this parser here: http://wiki.unity3d.com/index.php/SimpleJSON
*/
// If this starts with a bracket, it's an array of messages, so decode separately
if (receivedJson.StartsWith("["))
{
var msgList = JSON.Parse(receivedJson).AsArray;
for (var i = 0; i < msgList.Count; i++)
{
var msg = msgList[i];
UpdateObject(msg);
}
}
// Otherwise it's an individual message so decode individually
else
{
var msg = JSON.Parse(receivedJson);
UpdateObject(msg);
}
}
// Updates an object in the list with the given update message
void UpdateObject(JSONNode msg)
{
// Get the message ID filter, if any
var id = msg != null ? msg["id"].Value : null;
if (string.IsNullOrEmpty(id))
{
if (DebugLogMessages) Debug.LogWarning("AMQP message received without 'id' property.");
return;
}
// Get the object given its message ID
if (!objectsById.ContainsKey(id))
{
if (DebugLogMessages) Debug.LogWarningFormat("No AMQP Object Control Reference found for ID: {0}.", id);
return;
}
// Get the object reference for this ID
var objRef = objectsById[id];
if (UpdatePosition)
{
// If the property exists use its value, otherwise just use the current value
var objPos = UpdateInWorldSpace ? objRef.transform.position : objRef.transform.localPosition;
var posX = msg["posX"] != null ? msg["posX"].AsFloat : objPos.x;
var posY = msg["posY"] != null ? msg["posY"].AsFloat : objPos.y;
var posZ = msg["posZ"] != null ? msg["posZ"].AsFloat : objPos.z;
// Update with new values
if (UpdateInWorldSpace)
{
objRef.transform.position = new Vector3(posX, posY, posZ);
}
else
{
objRef.transform.localPosition = new Vector3(posX, posY, posZ);
}
}
if (UpdateRotation)
{
// If the property exists use its value, otherwise just use the current value
var objRot = UpdateInWorldSpace ? objRef.transform.eulerAngles : objRef.transform.localEulerAngles;
var rotX = msg["rotX"] != null ? msg["rotX"].AsFloat : objRot.x;
var rotY = msg["rotY"] != null ? msg["rotY"].AsFloat : objRot.y;
var rotZ = msg["rotZ"] != null ? msg["rotZ"].AsFloat : objRot.z;
// Update with new values
if (UpdateInWorldSpace)
{
objRef.transform.eulerAngles = new Vector3(rotX, rotY, rotZ);
}
else
{
objRef.transform.localEulerAngles = new Vector3(rotX, rotY, rotZ);
}
}
if (UpdateScale)
{
// If the property exists use its value, otherwise just use the current value
var scaleX = msg["sclX"] != null ? msg["sclX"].AsFloat : objRef.transform.localScale.x;
var scaleY = msg["sclY"] != null ? msg["sclY"].AsFloat : objRef.transform.localScale.y;
var scaleZ = msg["sclZ"] != null ? msg["sclZ"].AsFloat : objRef.transform.localScale.z;
// Update with new values
objRef.transform.localScale = new Vector3(scaleX, scaleY, scaleZ);
}
}
/// <summary>
/// Publishes a message to the current exchange using the form's input values.
/// </summary>
#region Event Handlers
// Handles a connection event
void HandleConnected(AmqpClient client)
{
//Connection.interactable = false;
//ConnectButton.interactable = false;
//DisconnectButton.interactable = true;
// Query exchange list
AmqpClient.GetExchangesAsync(FinishConnected);
}
// Finishes the connection event by receiving the async result of the exchange query and display the results in the drop down
void FinishConnected(AmqpExchange[] exchangeList)
{
// Copy list locally
exchanges = exchangeList;
//foreach (var exchange in exchanges)
//{
// if (exchange.Name == null || exchange.Name == "/") continue;
// var option = new Dropdown.OptionData(exchange.Name);
// ExchangeName.options.Add(option);
// PublishExchange.options.Add(option);
//}
//if (exchanges.Length > 0)
//{
// ExchangeName.RefreshShownValue();
// PublishExchange.RefreshShownValue();
//}
// Enable UI
//ExchangeName.interactable = true;
//RoutingKey.interactable = true;
//SubscribeButton.interactable = true;
//UnsubscribeButton.interactable = true;
//PublishButton.interactable = true;
//PublishExchange.interactable = true;
//PublishMessage.interactable = true;
//PublishRoutingKey.interactable = true;
}
// Handles a disconnection event
void HandleDisconnected(AmqpClient client)
{
//Connection.interactable = true;
//ConnectButton.interactable = true;
//DisconnectButton.interactable = false;
//ExchangeName.interactable = false;
//RoutingKey.interactable = false;
//SubscribeButton.interactable = false;
//UnsubscribeButton.interactable = false;
//PublishButton.interactable = false;
//PublishExchange.interactable = false;
//PublishMessage.interactable = false;
//PublishRoutingKey.interactable = false;
}
// Handles a reconnecting event
void HandleReconnecting(AmqpClient client)
{
}
// Handles a blocked event
void HandleBlocked(AmqpClient client)
{
}
// Handles exchange subscribes
void HandleExchangeSubscribed(AmqpExchangeSubscription subscription)
{
// Add it to the local list
exSubscriptions.Add(subscription);
}
// Handles exchange unsubscribes
void HandleExchangeUnsubscribed(AmqpExchangeSubscription subscription)
{
// Add it to the local list
exSubscriptions.Remove(subscription);
}
#endregion Event Handlers
#endregion Methods
}
} `
so I imagine you just use the AMQP publish method but I feel like I am missing something. I am an agriculture major so coding isnt exactly my fortay but i try.