Thursday, October 30, 2008

When one and one makes one

Some time ago I promised to write an article on TGpJoinedStream class. A time has come to fulfill the promise.

TGpJoinedStream is a stream wrapper class – a class that wraps one or more existing streams and provides a TStream interface on the outside. My GpStreams unit contains several examples of such wrappers: TGpStreamWindow provides a stream access to a part of another stream, TGpScatteredStream provides contiguous access to a scattered data (hm, I never wrote an article about that one, didn’t I?), and TGpBufferedStream adds data caching to any stream.

TGpJoinedStream’s role is simple. You pass it a bunch of streams and it provides stream access to concatenated data. IOW, if first stream contains numbers <1, 2, 3> and second stream contains <4, 5>, joined stream would function as if it contains <1, 2, 3, 4, 5>.

Implementation is pretty straightforward and won’t describe it here. I'll rather show how TGpJoinedStream is used in practice and why I wrote it at all.

We have a MPEG parser class that takes a TStream containing MPEG video data and extracts some information from it. It works fine when you need to extract information from a video file, but not so good if you want to use it inside a DirectX filter. From a viewpoint of a DirectX source or sink filter, video is not a stream but a bunch of buffers, which we must process one by one. The trouble is that buffers can almost never be fully processed because MPEG sequences don’t contain data length.

In MPEG, each sequence starts with byte signature 00 00 01, followed by a byte that tells you what kind of data you’re processing, followed by some data. Unless you want to parse each and every sequence data and you know exactly how all well-formed and malformed sequences in the existence are built, you only know that you reached the end of one sequence when you reach another 00 00 01 signature. That’s why the last sequence in the buffer can never be processed (unless this is the last buffer of them all) until we find 00 00 01 in the next buffer. Uff. I hope somebody understands this at all.

In short, we are processing data buffers. A variable-length part of each buffer will stay unprocessed and will have to be prepended to the next buffer before it is passed through the MPEG parser. And so on, until the end of data.

And that’s where TGpJoinedStream comes to help. The unprocessed part is stored in a memory stream FLeftovers, which is empty at the beginning. Buffer parser concatenates FLeftovers with the current data and passes the result through the stream parser. When that one returns, .Position in the combined stream indicates the first unprocessed byte. The unprocessed data is then copied into the FLeftovers stream so it can be used next time the buffer parser is called.

That’s how it looks in code (slightly simplified real code; I just removed some details that are not interesting for our story).

procedure TGpMPEGSequentialParser.ParseNext(buffer: pointer; 
bufferSize: integer);
var
combinedStream: TGpJoinedStream;
mpegParser : TGpMPEGParser;
newLeftovers : TMemoryStream;
strBuffer : TGpFixedMemoryStream;
begin
strBuffer := TGpFixedMemoryStream.Create(buffer^, bufferSize);
try
newLeftovers := TMemoryStream.Create;
try
combinedStream := TGpJoinedStream.Create([FLeftovers, strBuffer]);
try
mpegParser := TGpMPEGParser.Create;
try
mpegParser.MPEGStream := combinedStream;

mpegParser.Parse(HandleMpegSequence);

if combinedStream.Position < combinedStream.Size then
newLeftovers.CopyFrom(combinedStream,
combinedStream.Size - combinedStream.Position);
finally FreeAndNil(mpegParser); end;
finally FreeAndNil(combinedStream); end;
finally
FreeAndNil(FLeftovers);
FLeftovers := newLeftovers;
end;
finally FreeAndNil(strBuffer); end;
end;


Hope you like it!

Tuesday, October 21, 2008

Internet, as predicted in 1946

“You know the logics setup. You got a logic in your house. It looks like a vision receiver used to, only it's got keys instead of dials and you punch the keys for what you wanna get. It's hooked in to the tank, which has the Carson Circuit all fixed up with relays. Say you punch "Station SNAFU" on your logic. Relays in the tank take over an' whatever vision-program SNAFU is telecastin' comes on your logic's screen. Or you punch "Sally Hancock's Phone" an' the screen blinks an' sputters an' you're hooked up with the logic in her house an' if somebody answers you got a vision-phone connection. But besides that, if you punch for the weather forecast or who won today's race at Hialeah or who was mistress of the White House durin' Garfield's administration or what is PDQ and R sellin' for today, that comes on the screen too. The relays in the tank do it. The tank is a big buildin' full of all the facts in creation an' all the recorded telecasts that ever was made—an' it's hooked in with all the other tanks all over the country—an' everything you wanna know or see or hear, you punch for it an' you get it. Very convenient. Also it does math for you, an' keeps books, an' acts as consultin' chemist, physicist, astronomer, an' tea-leaf reader, with a "Advice to the Lovelorn" thrown in. The only thing it won't do is tell you exactly what your wife meant when she said, "Oh, you think so, do you?" in that peculiar kinda voice. Logics don't work good on women. Only on things that make sense.”

- Murray Leinster, A Logic Named Joe

For Sci-Fi fans: Baen Free Library.

Tuesday, October 14, 2008

TDM Rerun #11: Shared Pools

Sometimes we run into problems because of a Windows feature, and most of the time this is a very good feature, that a file mapping (an essential part of my shared memory implementation) cannot exist on its own. A file mapping, like mutexes, events, and other Windows primitives, must have an owner. If all processes associated with a given file mapping die, the file mapping will be destroyed. Because in my shared memory implementation this file mapping is backed with a page file, its contents won’t be preserved on an accessible part of the disk.

- Shared Pools, The Delphi Magazine 95, July 2003

Shared pool architectureThe shared pool was one of my more baroque creations. In fact, it was so complicated that it was never used in a deployed application. Basically, the article described an architecture to implement a pool of shared memory objects, which multiple Writers could use to send data to one Reader (typically sitting in another process). The system also handled cleanup when a Reader task died and other management details.

You really should not be playing with this code. There are better solutions.

Links: article (PDF, 193 KB), source code (ZIP, 1.9 MB)

Tuesday, October 07, 2008

OmniThreadLibrary: Using RTTI to call task methods

Yesterday I wrote an article on by-name and by-address invocations in the new OmniThreadLibrary (development version) but I didn’t finish the description of the OTL internal magic that makes those new calls work. Let’s fix that …

I ended the story right at the point where TOmniTaskExecutor.Asy_DispatchMessages calls DispatchOmniMessage.

Most of the magic happens inside this method, so it’s only fair to display it in its full glory.

procedure TOmniTaskExecutor.DispatchOmniMessage(msg: TOmniMessage);
var
methodAddr : pointer;
methodInfoObj : TObject;
methodInfo : TOmniInvokeInfo absolute methodInfoObj;
methodName : string;
methodSignature: TOmniInvokeType;
msgData : TOmniValue;
obj : TObject;
begin
if msg.MsgID = COtlReservedMsgID then begin
Assert(assigned(WorkerIntf));
GetMethodNameFromInternalMessage(msg, methodName, msgData);
if methodName = '' then
raise Exception.Create('TOmniTaskExecutor.DispatchOmniMessage: Method name not set');
if not assigned(oteMethodHash) then
oteMethodHash := TGpStringObjectHash.Create(17, true); //usually there won't be many methods
if not oteMethodHash.Find(methodName, methodInfoObj) then begin
GetMethodAddrAndSignature(methodName, methodAddr, methodSignature);
methodInfo := TOmniInvokeInfo.Create(methodAddr, methodSignature);
oteMethodHash.Add(methodName, methodInfo);
end;
case methodInfo.Signature of
itSelf:
TOmniInvokeSignature_Self(methodInfo.Address)(WorkerIntf.Implementor);
itSelfAndOmniValue:
TOmniInvokeSignature_Self_OmniValue(methodInfo.Address)(WorkerIntf.Implementor, msgData);
itSelfAndObject:
begin
obj := msgData.AsObject;
TOmniInvokeSignature_Self_Object(methodInfo.Address)(WorkerIntf.Implementor, obj);
end
else
RaiseInvalidSignature(methodName);
end; //case methodSignature
end
else
WorkerIntf.DispatchMessage(msg);
end; { TOmniTaskExecutor.DispatchMessage }

Now let’s dissect it. The code first checks if  the message ID contains the magic value. If not, the message is dispatched as before by using Delphi’s Dispatch.

  if msg.MsgID = COtlReservedMsgID then begin
//...
end
else
WorkerIntf.DispatchMessage(msg);
end; { TOmniTaskExecutor.DispatchMessage }

Not interesting. Let’s take a look at the other path – when msg.MsgID is an internal message ID.

In this case, DispatchMessages extracts the method name from the message. If the caller passed a method name to the Invoke, then the job is simple – it must only be extracted from the TOmniInternalAddressMsg object. If, on the other hand, method pointer was used, the code calls MethodName to convert it to a (who would guess?) method name.

procedure TOmniTaskExecutor.GetMethodNameFromInternalMessage(const msg: TOmniMessage; var
msgName: string; var msgData: TOmniValue);
var
internalType: TOmniInternalMessageType;
method : pointer;
begin
internalType := TOmniInternalMessage.InternalType(msg);
case internalType of
imtStringMsg:
TOmniInternalStringMsg.UnpackMessage(msg, msgName, msgData);
imtAddressMsg:
begin
TOmniInternalAddressMsg.UnpackMessage(msg, method, msgData);
msgName := WorkerIntf.Implementor.MethodName(method);
if msgName = '' then
raise Exception.CreateFmt('TOmniTaskExecutor.GetMethodNameFromInternalMessage: ' +
'Cannot find method name for method %p', [method]);
end
else
raise Exception.CreateFmt('TOmniTaskExecutor.GetMethodNameFromInternalMessage: ' +
'Internal message type %s is not supported',
[GetEnumName(TypeInfo(TOmniInternalMessageType), Ord(internalType))]);
end; //case internalType
end; { TOmniTaskExecutor.GetMethodNameFromInternalMessage }

Next, the code looks into an internal hash table and tries to fetch information on that method name.  If there’s no such information (when some method is called for the first time), GetMethodAddrAndSignature is called to convert the name to the method’s address and signature. Then this information is stored in the hash table so it will be immediately ready next time.

if not oteMethodHash.Find(methodName, methodInfoObj) then begin
GetMethodAddrAndSignature(methodName, methodAddr, methodSignature);
methodInfo := TOmniInvokeInfo.Create(methodAddr, methodSignature);
oteMethodHash.Add(methodName, methodInfo);
end;

Method signature is then used to select the type of call that will be performed. Only three signatures are supported: (Self: TObject), (Self: TObject; const msgData: TOmniValue) and (Self: TObject; var obj: TObject). Demo 18 demonstrates the use of all three. What I find especially convenient is that the third option allows you to put any TObject descendant in the parameter list. In other words, you can do:

type
TAsyncHello = class(TOmniWorker)
published
procedure TheAnswer(var sl: TStringList);
end;

procedure TfrmTestStringMsgDispatch.btnSendObjectClick(Sender: TObject);
var
sl: TStringList;
begin
sl := TStringList.Create;
sl.Text := '42';
FHelloTask.Invoke(@TAsyncHello.TheAnswer, sl);
end;

procedure TAsyncHello.TheAnswer(var sl: TStringList);
begin
FreeAndNil(sl);
end;

Convenient, huh?

RTTI

The above description of the DispatchOmniMessage looks very much like the famous Sydney Harris cartoon. In step one a method name is retrieved from the message. In step three the method is invoked using the right signature and method pointer. In step two, well …

Sydney Harris

Let’s be more explicit, then. We have a method name and we want to find the address for that method and some information about its parameters. Sounds like a job for RTTI, yes? Well, basic RTTI (the one that you enable with {$M+} or {$TYPEINFO ON} or simply by declaring a published property) only handles published properties, not methods. For methods, we need to use extended RTTI, which is enabled with {$METHODINFO ON}. I won’t describe it here as David Glassborow and Hallvard Vassbotn already did the job better than I could. If you’re interested in extended RTTI, I suggest that your research starts at Hallvard’s David Glassborow on extended RTTI article.

Great thanks for publishing all that info, guys, it helped a lot!

GetMethodAddrAndSignature first uses Delphi’s ObjAuto unit to extract method information header. Then it uses TObject’s MethodAddress to convert method name into address. Of course, it doesn’t use TObject directly, as it has no idea where the methods belonging to the task object are stored – it must call MethodAddress on your task object directly.

  methodInfoHeader := ObjAuto.GetMethodInfo(WorkerIntf.Implementor, methodName);
methodAddress := WorkerIntf.Implementor.MethodAddress(methodName);

After that, some sanity checks are run, which I won’t reprint here.

Then the method signature is checked. First we want to ensure that we’re dealing with a procedure, not a function.

  if assigned(methodInfoHeader.ReturnInfo.ReturnType) then
raise Exception.CreateFmt('TOmniTaskExecutor.DispatchMessage: ' +
'Method %s.%s must not return result',
[WorkerIntf.Implementor.ClassName, methodName]);

At the end, a messy fragment of code walks over the parameter description list for this method and checks that first parameter is a class reference (because this method is a part of a class, it has a hidden Self parameter at the beginning) and that the second parameter, if present at all, is either const data: TOmniValue or var obj: TObject (or descendant of TObject). Detected signature is then returned in the methodSignature parameter.

  // only limited subset of method signatures is allowed:
// (Self), (Self, const TOmniValue), (Self, var TObject)
headerEnd := cardinal(methodInfoHeader) + methodInfoHeader^.Len;
params := PParamInfo(cardinal(methodInfoHeader) + SizeOf(methodInfoHeader^)
- CShortLen + SizeOf(TReturnInfo) + Length(methodInfoHeader^.Name));
paramNum := 0;
methodSignature := itUnknown;
// Loop over the parameters
while cardinal(params) < headerEnd do begin
Inc(paramNum);
paramType := params.ParamType^;
if paramNum = 1 then
if (params^.Flags <> []) or (paramType^.Kind <> tkClass) then
RaiseInvalidSignature(methodName)
else
methodSignature := itSelf
else if paramNum = 2 then
//code says 'const' but GetMethodInfo says 'pfVar' :(
if (params^.Flags * [pfConst, pfVar] <> []) and (paramType^.Kind = tkRecord) and
(SameText(paramType^.Name, 'TOmniValue'))
then
methodSignature := itSelfAndOmniValue
else if (params^.Flags = [pfVar]) and (paramType^.Kind = tkClass) then
methodSignature := itSelfAndObject
else
RaiseInvalidSignature(methodName)
else
RaiseInvalidSignature(methodName);
params := params.NextParam;
end;

It looks messy and it is messy, but it does the job. If you have problems understanding this code, I’d recommend stepping over it with the debugger.

Benchmarks

In test 19 I implemented some benchmarking code to find out how much this approach is slower than the standard Comm.Send(msg, data). It turned out that not very much, thanks to the built-in caching.

image

Integer methods dispatching is about twice as fast as the string/pointer dispatching. That’s completely acceptable speed for something that is executed only few (thousand) times in the program’s lifetime. If your program model is based on sending millions and millions of do computation messages to the worker thread then it is doomed since the beginning anyway.

That would be enough for today, hope you liked it and stay well since the next time. Bye!

Monday, October 06, 2008

Erlangenizing the OmniThreadLibrary

Few days ago I “discovered” Erlang. [Such things happen when you read StackOverflow obsessively (sigh).] I started with Wikipedia and proceeded with the Pragmatic Programmer book – but that’s not really important. I wanted to talk about Erlang’s built-in concurrency support.

Some things are very similar to the OTL (if we take into account that Erlang is a functional language and Delphi is not) and some are different but one immediately jumped to my attention. In Erlang, the message recipient doesn’t use numeric code to discover what message it has received; instead of that, whole message is matched to a programmer provided pattern (or patterns). The interesting thing is that usually (by convention) the first element in the message is an atom (a name, a sequence of characters, if you want). That got me thinking … Why do we send integer messages in the OTL, anyway?

The Windows Way

From the very beginnings, OmniThreadLibrary tried to make programmer’s life simple. Well, at least simpler. One of the helping hands it offered was was simplified message processing. In the traditional Windows thread programming you have to wait for various objects to become signaled with WaitForMultipleObjects and then proceed accordingly. In practice that means that thread’s main logic is centralized in one very big method that handles all those events, mutexes and other stuff that can be waited upon.

OTL helps by implementing this logic internally (at least for workers that implement IOmniWorker interface). Instead of using kernel primitives, task owner sends messages to the task’s message queue. Messages are processed somewhere inside the OTL (specifically OtlTaskControl.pas/TOmniTaskExecutor.Asy_DispatchMessages) and are converted into method calls with Delphi’s Dispacth mechanism. That’s the same mechanism that makes sure that Windows messages are “converted” into Delphi methods and it requires that first two bytes of the dispatched message contain message ID. That’s why (until now) the recommended way to send a message to task was:

const
MSG_CHANGE_MESSAGE = 1;
MSG_SEND_MESSAGE = 2;

type
TAsyncHello = class(TOmniWorker)
strict private
aiMessage: string;
public
function Initialize: boolean; override;
procedure OMChangeMessage(var msg: TOmniMessage); message MSG_CHANGE_MESSAGE;
procedure OMSendMessage(var msg: TOmniMessage); message MSG_SEND_MESSAGE;
end;

FHelloTask: IOmniTaskControl;

FHelloTask.Comm.Send(MSG_CHANGE_MESSAGE, 'Random ' + IntToStr(Random(1234)));

[You can read more about this approach in OmniThreadLibrary Example #4: Bidirectional communication, the OTL way.]

The Erlang Way

This approach simplifies writing threaded code – at least the one that doesn’t depend heavily on shared data structures. But there’s still some room for improvement. For example, do we really have to use numeric messages, which have to be declared in advance. Why couldn’t the task controller just tell the task to execute the OMChangeMessage method?

To cut the long story short – this is now possible. Yesterday I committed a set of OTL modifications that allow you to do this:

type
TAsyncHello = class(TOmniWorker)
published
procedure Change(const data: TOmniValue);
end;

FHelloTask: IOmniTaskControl;

FHelloTask.Invoke('Change', 'Random ' + IntToStr(Random(1234)));

Yes, the Change method is published here. This is important.

Simple, huh? There’s a small problem, though – there are no compile-time checks. The code sends a string and compiler can do nothing to verify validity of this string. If the name was mistyped, you’d only notice it during the program execution.

To fix this problem, OTL allows another form of method invocation which uses a method address instead of  the name.

FHelloTask.Invoke(@TAsyncHello.Change, 'Random ' + IntToStr(Random(1234)));

In this case the compiler can check your typing, but still it won’t catch all problems – for example, the following code will compile and then raise exception during the execution.

procedure TfrmTestStringMsgDispatch.btnTestInvalidMsgClick(Sender: TObject);
begin
if cbStringMessages.Checked then
// will fail, FooBar method is not defined
FHelloTask.Invoke('FooBar')
else
// will fail, can only invoke methods from the task's class
FHelloTask.Invoke(@Self.btnTestInvalidMsg);
end;

[All new functionality is exposed in new demo 18_StringMsgDispatch.]

Implementation

To understand how the Invoke is implemented, it’s best to trace one such call. First we see that Invoke  gets converted into a normal message.

procedure TOmniTaskControl.Invoke(const msgMethod: pointer; msgData: TOmniValue);
begin
Comm.Send(TOmniInternalAddressMsg.CreateMessage(msgMethod, msgData));
end; { TOmniTaskControl.Invoke }

class function TOmniInternalAddressMsg.CreateMessage(const msgMethod: pointer; msgData:
TOmniValue): TOmniMessage;
begin
Result := TOmniMessage.Create(COtlReservedMsgID,
TOmniInternalAddressMsg.Create(msgMethod, msgData));
end; { TOmniInternalAddressMsg.CreateMessage }

This message has message ID COtlReservedMsgID (which is equal to $FFFF, so from now on please don’t use this message for you purposes). Message data field contains object which wraps method name and message data that was passed to the Invoke. Similar code is executed when Invoke is called with the method pointer parameter.

OK, so that’s how method name travels from the task controller to the task itself by using standard communication channel. But that is only half of the story … the simpler part!

On the receiving side, TOmniTaskExecutor.Asy_DispatchMessages detects new message and calls DispatchOmniMessage to process it.

if awaited = idxFirstMessage then
gotMsg := task.Comm.Receive(msg)
else begin
oteInternalLock.Acquire;
try
gotMsg := (oteCommList[awaited - idxFirstMessage - 1] as
IOmniCommunicationEndpoint).Receive(msg);
finally oteInternalLock.Release; end;
end;
if gotMsg and assigned(WorkerIntf) then
DispatchOmniMessage(msg);

Now that’s where the things start to get really interesting as we have to use RTTI and even extended RTTI to call the appropriate method. It is also the point where I’ll cut the story short. This article is already very long, maybe even too long, and I have much more to say on the subject. Expect part II to be published in few days. [Of course, if you’re curious you can just look into the code to see how DispatchOmniMessage is implemented!]

Test it!

The newest OmniThreadLibrary code is only available in the repository. No snapshots this time.

I’d still be immensely grateful to anybody that will test the new functionality and provide me with his thoughts on this approach.

Friday, October 03, 2008

Bulk update

Delphi 2009 was released so now it’s time to publish updates to my various units … In this article I’m just publishing the short changelog; you can expect more details on some enhancements (TGpJoinedStream, 4- and 8- aligned integer, Unicode support in GpStructuredStorage) in the near future.

DSiWin32 1.41

  • Compatible with Delphi 2009.
  • Created DSiInterlocked*64 family of functions by copying the code from http://qc.borland.com/wc/qcmain.aspx?d=6212. Functions were written by Will DeWitt Jr and are included with permission.
  • New functions DSiCopyFileAnimated, DSiConnectToNetworkResource, DSiIsHtmlFormatOnClipboard, DSiGetHtmlFormatFromClipboard, DSiCopyHtmlFormatToClipboard, DSiYield.
  • Added constants FILE_LIST_DIRECTORY, FILE_SHARE_FULL, FILE_ACTION_ADDED, FILE_ACTION_REMOVED, FILE_ACTION_MODIFIED, FILE_ACTION_RENAMED_OLD_NAME, FILE_ACTION_RENAMED_NEW_NAME.
  • Forced {$T-} as the code doesn't compile in {$T+} state.
  • Bug fixed: It was not possible to use DSiTimeGetTime64 in parallel from multiple threads

GpHugeFile 5.05a

  • Optimization: Under some circumstances, lots of unnecessary SetFilePointer calls were made.

GpLists 1.41

  • Works with Delphi 2009.

GpStreams 1.25

  • Added TGpJoinedStream class.
  • Span-storing class can now be modified via TGpScatteredStream.SpanClass.
  • TGpScatteredStream's AddSpan and AddSpanOS now return span offset in the span list.
  • Added bunch of BE_ overloads to the TGpStreamEnhancer class.
  • Small optimization in KeepStreamPositionWrapper destructor.

GpStructuredStorage 2.0

  • Works with Delphi 2009.
  • Added Unicode support to the underlying storage. File and folder names are stored in UTF-16, as are attribute names and values. API is still based on Delphi's 'string' type - meaning that values are converted to Unicode and back on the fly using the current locale.
  • Existing structured storage files will be upgraded automatically if they are not open for readonly access. Applications, compiled with GpStructuredStorage 1.x will not be able to read new/upgraded files. Version is incremented to 2.0.0.0 when 1.x file is opened unless it is opened in readonly mode. Newly created storages have version 2.0.0.0.

GpStuff 1.13

  • Implemented 4-aligned integer, TGp4AlignedInt.
  • Implemented 8-aligned integer, TGp8AlignedInt.
  • Added function OpenArrayToVarArray, written by Thomas Schubbauer.
  • ReverseWord/ReverseDWord rewritten in assembler (by GJ).
  • Declared MaxInt64 constant.

GpSync 1.21

  • Added optional external message counter to the message queue.

GpTextFile 4.01

  • Added TGpTextFile.Write(ws: WideString) and TGpTextFile.Writeln(s: string) overloads.
  • TGpTextFile.Write[ln] string parameters made 'const'.
  • Bug fixed [found by AKi]: TGpTextFile.Write was not working when using CP_UTF8 codepage.

GpTextStream 1.06

  • Works with Delphi 2009.
  • Exported StringToWideString, WideStringToString, and GetDefaultAnsiCodepage.

GpVersion 2.02

  • Added another CreateVersion overload.
  • Extended IVersion interface with IsHigherThan and IsLowerThan.