Thursday, October 20, 2011

OmniThreadLibrary in Practice [1]–Web Download and Database Storage

From time to time I get a question on the OmniThreadLibrary forum that could be of interest to other OmniThreadLibrary users. As I believe that many of you don’t follow that forum I have decided to repost such questions on the blog (and provide answers, of course).

The first question in this series was asked by GoustiFruit:

I need to download a list of web pages, extract data on them and then store these data in a SQLite database. The downloading/extracting part will happen in multiple threads (I'm using Synapse), but querying the database needs to be done asynchronously as I can only have one concurrent access to it.
So I'd like to know how (conceptually) I could implement that ? My first idea is to run one single thread for querying the DB, run several threads for each Url to download/analyse and then exchange messages between these threads and the querying thread, with the extracted data as a parameter: does it make sense or am I totally wrong? I also read things about the connection pool concept but I'm not sure if it applies when only one connection is allowed at one time?

To start at the end – No, there’s no need for connection pool if there’s only one connection to the database. Even if there were more database writers, those would be long-term task and as such each would establish its own connection to the database. Connection pool concept is more appropriate for situations where there’s plenty of short-term tasks that each require its own database connection (a situation that typically occurs in servers that respond to client requests).

High-level solution

My first idea was to create a pipeline with two stages – multiple http retrievers in the first stage and one database writer in the second stage. Plenty of tasks could be accessing web at the same time - maybe even more then there are cores as those tasks would mostly block while accessing the web.

First stage, Retriever, fetches contents of one page. If the page is fetched correctly, page description object (not shown in this demo) is created and sent to the output pipeline. As there can be at most one output generated for each input, this stage is implemented as a simple stage.

Inserter is implemented as a normal stage (so it has to loop internally over all input data). First it establishes a connection to the database, then it loops over all input values (over all successfully retrieved pages) and inserts each result into the database, and at the end it closes the database connection.

Main method (ParallelWebRetriever) first sets up and starts the pipeline. Next it feeds URLs to be retrieved into the input pipeline and closes the input pipeline. At the end it waits for the pipeline to complete.

uses
  OtlCommon,
  OtlCollections,
  OtlParallel;

function HttpGet(url: string; var page: string): boolean;
begin
  // retrieve page contents from the url; return False if page is not accessible
end;

procedure Retriever(const input: TOmniValue; var output: TOmniValue);
var
  pageContents: string;
begin
  if HttpGet(input.AsString, pageContents) then
    output := TPage.Create(input.AsString, pageContents);
end;

procedure Inserter(const input, output: IOmniBlockingCollection);
var
  page   : TOmniValue;
  pageObj: TPage;
begin
  // connect to database
  for page in input do begin
    pageObj := TPage(page.AsObject);
    // insert pageObj into database
    FreeAndNil(pageObj);
  end;
  // close database connection
end;

procedure ParallelWebRetriever;
var
  pipeline: IOmniPipeline;
  s       : string;
  urlList : TStringList;
begin
  // set up pipeline
  pipeline := Parallel.Pipeline
    .Stage(Retriever).NumTasks(Environment.Process.Affinity.Count * 2)
    .Stage(Inserter)
    .Run;
  // insert URLs to be retrieved
  for s in urlList do
    pipeline.Input.Add(s);
  pipeline.Input.CompleteAdding;
  // wait for pipeline to complete
  pipeline.WaitFor(INFINITE);
end;

Low-level solution

Low-level solution is put together in a similar manner – there are multiple retriever tasks and one database inserter task. Of course, there’s much more code here.

An interesting trick, which is commonly used in the OtlParallel unit, is the use of a IOmniResourceCount to detect last running retriever task (which must close the output pipe so that the database inserter would know that it has to stop).

uses
  OtlCommon,
  OtlSync,
  OtlTask,
  OtlTaskControl,
  OtlCollections;

function HttpGet(url: string; var page: string): boolean;
begin
  // retrieve page contents from the url; return False if page is not accessible
end;

procedure Retriever(const task: IOmniTask);
var
  input       : IOmniBlockingCollection;
  output      : IOmniBlockingCollection;
  pageContents: string;
  param       : TOmniValue;
  taskCount   : IOmniResourceCount;
  url         : TOmniValue;
begin
  param := task.Param['Input']; 
  input := param.AsInterface as IOmniBlockingCollection;
  param := task.Param['Output']; 
  output := param.AsInterface as IOmniBlockingCollection;
  param := task.Param['TaskCount']; 
  taskCount := param.AsInterface as IOmniResourceCount;
  for url in input do begin
    if HttpGet(url, pageContents) then
      output.Add(TPage.Create(url.AsString, pageContents));
  end;
  if taskCount.Allocate = 0 then
    output.CompleteAdding;
end;

procedure Inserter(const task: IOmniTask);
var
  input  : IOmniBlockingCollection;
  page   : TOmniValue;
  pageObj: TPage;
  param  : TOmniValue;
begin
  param := task.Param['Input']; 
  input := param.AsInterface as IOmniBlockingCollection;
  // connect to database
  for page in input do begin
    pageObj := TPage(page.AsObject);
    // insert pageObj into database
    FreeAndNil(pageObj);
  end;
  // close database connection
end;

procedure ParallelWebRetriever;
var
  input         : IOmniBlockingCollection;
  inserterTask  : IOmniTaskControl;
  iTask         : integer;
  numRetrievers : integer;
  retrieverCount: IOmniResourceCount;
  s             : string;
  sqlInput      : IOmniBlockingCollection;
begin
  numRetrievers := Environment.Process.Affinity.Count * 2;
  input := TOmniBlockingCollection.Create;
  sqlInput := TOmniBlockingCollection.Create;
  // run inserter
  inserterTask := CreateTask(Inserter, 'Inserter task')
    .SetParameter('Input', sqlInput)
    .Run;
  retrieverCount := CreateResourceCount(numRetrievers);
  // run retrievers
  for iTask := 1 to numRetrievers do begin
    CreateTask(Retriever, Format('Retriever task #%d', [iTask]))
      .SetParameter('Input', input)
      .SetParameter('Output', sqlInput)
      .SetParameter('TaskCount', retrieverCount)
      .Unobserved
      .Run;
  end;
  // insert URLs to be retrieved
  for s in urlList do
    input.Add(s);
  input.CompleteAdding;
  // wait for pipeline to complete
  inserterTask.WaitFor(INFINITE);
  inserterTask.Terminate;
end;

3 comments:

  1. Thanks @gabr!

    I like how you can demonstrated the versatility and flexibility of your OTL framework in that it can be used from either a high-level or low-level perspective.

    Good work and a great testament to you.

    ReplyDelete
  2. Joseph08:46

    Nice Example, using High or Low level speed/performance will be the same?

    ReplyDelete
    Replies
    1. Yes, performance will be comparable in both cases.

      Delete