Wednesday, March 21, 2012

Passing around message contents as Streams (conversion to/from String & XmlDoc's).

More of a general "Streams" question than broker specific, but this is all being done in the context of Broker passing the messages around. The use of Streams & Encoding seems to be my problem and I'm not as familiar with Streams as I am with other areas of the Framework... Any advice would be appreciated.

At this point, I've created my own objects/stored procedures based loosely on the ServiceBrokerInterface class provided in the SQL Server samples. Some of this was done for simplification and as a learning exercise, but also to ensure that all of the SQL operations are being done via Stored Procedures and not inline SQL. This was done to adhere to our existing security policy used on this project.

In this "interface" I've built, I have a [BrokerMessage.cs] class which is meant to have a few additional pieces of functionality beyond what the MS provided version had supplied.

1st... A constructor for accepting either String or XmlDocument as the "content"

2nd... Methods to return either a XmlDocument or a simple String.

Since all of the Broker functionality is defined as using VARBINARY(MAX) in my stored procedures, I don't believe I have any problems at that level. It's simply a binary blob to Broker.

In my constructor for accepting String or XmlDocuments, I attempted to use the following...

public BrokerMessage(string type, XmlDocument contents)

{

m_type = type;

m_contents = new MemoryStream(EncodingProvider.GetBytes(contents.InnerXml));

}

My understanding was that MemoryStream is derived from Stream so I can implicitly cast it. The "EncodingProvider" is a static member set as follows:

public static Encoding EncodingProvider = Encoding.Unicode;

This way I ensure that internal & external code can all be set to use the same encoding and easily changed if necessary. I was hoping to avoid using Unicode since the rest of the application does not require it, but from my understanding all Xml documents in SQL Server are Unicode based, so this should be a better encoding choice for any processing that may potentially occur within SQL Server itself.

In my methods to return the various forms of the "Stream", I have the following code... The ToBytes() method is what is used to pass intot he stored procedure parameter that is defined as VarBinary and expecting a byte array. One area of concern is that the Read method accepts an INT for the length, but the actual Length property is a LONG. I'm sure there's a better way to handle this and I would welcome any advise there.

/// <summary>

/// Used to convert from a Stream back to a simple Byte array.

/// </summary>

/// <returns></returns>

public virtual byte[] ToBytes()

{

byte[] results = new byte[this.Contents.Length];

this.Contents.Read(results, 0, (int)this.Contents.Length);

return results;

}

/// <summary>

/// Used to convert from a Stream back to a simple String.

/// </summary>

/// <returns></returns>

public new string ToString()

{

byte[] buffer = this.ToBytes();

String results = EncodingProvider.GetString(buffer);

return results;

}

/// <summary>

/// Used to convert from a Stream back to a simple XmlDocument.

/// </summary>

/// <returns></returns>

public virtual XmlDocument ToXmlDocument()

{

XmlDocument results = new XmlDocument();

results.InnerText = this.ToString();

return results;

}

Further confusion & frustration. I'm frustrated that the stream Read methods all use INT for parameters. Which makes no sense since a Stream length can be Long. Even when reading in a "chunk" at a time, the "offset" in the buffer might potentially need to be a Long... even if the "read size" is just an Int. argh...

So I revised my code as below. Not quite a finished piece of code, but more of a draft to get me going. I thought using one of these "Readers" as the HellowWorld_CLR sample used might hold some answers. Not the case. Both StreamReader and BinaryReader are giving me no results.

This appears to be because after "using" the Stream once to populate a database table (this is on a received message populated from a SqlBytes/Reader), subsequent access fails. When I look at the underlying datatype, it is of the [SqlTypes.StreamOnSqlBytes] type. One thing I noticed in the ServiceBrokerInterface / Message.cs class is that when populating from the DataReader, they call the m_XXXXX variables for everything BUT the actual message content. For that case, they call the Body Property Set which is defined as Stream just like the underlying m_Body member. Seemed odd, so I followed suit in my class but that did not resolve the problem.

So here's the catch. When debugging this in SQL/CLR I did come across something odd. The Position is always 64 and the length 64. When I first consume the stream, Position was at zero. So I explicity put calls in to move the "position". I do this via BOTH the Position and Seek members. The result is that Position stays at 64. I can see through QuickWatch that the "CanSeek" value is "true". So this makes no sense. Now, to further confuse the issue. When I tried to set the Position property via QuickWatch, it would not change.... BUT.... when I hovered over it until IntelliSense displayed the value and then editited it there... the property "took" and was at zero. Attempting to perform a "Read" on the stream at that point returned my data as it should. This was not always a repeatable exercise, but 1 out of 3 or so... very inconsistent. When I took that byte array and did a "GetString" from my Encoding... I did see my original Xml message as it should be. So this confirms that I'm properly sending the message and that it's on the "reception" side of things where I'm having trouble.

Any Stream experts out there care to chime in? Streams shouldn't be this difficult to utilize. What am I missing here?

protected virtual byte[] ToBytes()

{

long readBytes = 0;

int readSize = 1024;

int lastRead = 0;

int position = 0;

this.Contents.Position = 0;

this.Contents.Seek(0, SeekOrigin.Begin);

BinaryReader reader = new BinaryReader(this.Contents, EncodingProvider);

byte[] results = new byte[this.Contents.Length];

while (readBytes < this.Contents.Length)

{

if ((this.Contents.Length - readBytes) < readSize)

{

readSize = (int)(this.Contents.Length - readBytes);

}

lastRead = reader.Read(results, position, readSize);

readBytes += readSize;

position = (int)readBytes + 1;

}

return results;

}

|||

After looking at how SqlBytes was being used in the population, I attempted the following and this works. I can't explain why the previous attempts did not work or why this ones does.... but I am getting back the proper array of bytes.

protected virtual byte[] ToBytes()

{

SqlBytes bytes = new SqlBytes(this.Contents);

byte[] results = bytes.Buffer;

return results;

}

|||

By default, SqlCommand.ExecuteReader will read one row at a time. So if you were doing a RECEIVE, it will bring the entire row including the message body into an in-memory buffer. In that case, the SqlBytes object created by DataReader.GetSqlBytes() contains a buffer that has the entire message_body inside it. If you want true streaming, you will have to specify the CommandBehavior to be SequentialAccess in the call to ExecuteReader. While reading the columns you must read in the order they are declared in the query. GetSqlBytes() will return a true stream and the Buffer property will not give you the entire message at once.

About your question regarding why the Stream.Read api takes in an int-sized offset argument? The offset argument is not an offset into the stream itself but one into the buffer you supply to the Read method. Since the buffer cannot be larger than 2^32 bytes (otherwise it wouldn't be addressable in 32-bit environments) the offset argument is simply an int.

Finally, you are already making an assumption that the message size will fit in memory (since you are using XmlDocument and string). So why bother with streams at all? You could simply use DataReader.GetBytes() which returns a byte array that can be decoded and parsed into an XmlDocument object.

Rushi

|||

Digging through the ADO.NET's ExecuteReader implementation using the Reflector, it seems that the streaming behavior using SqlBytes is only available for SQLCLR "context connections" and not for TDS. So effectively for Service broker external activations, CommandBehavior.SequentialAccess does not provide much of a benefit as far as streaming functionality is concerned. Can anybody confirm this?

Thanks,

Ramkishore

sql

No comments:

Post a Comment