From time to time I get a question on the OmniThreadLibrary forum that could be of interest to other OmniThreadLibrary users. As I believe that many of you don’t follow that forum I have decided to repost such questions on the blog (and provide answers, of course).
The first question in this series was asked by GoustiFruit:
I need to download a list of web pages, extract data on them and then store these data in a SQLite database. The downloading/extracting part will happen in multiple threads (I'm using Synapse), but querying the database needs to be done asynchronously as I can only have one concurrent access to it.
So I'd like to know how (conceptually) I could implement that ? My first idea is to run one single thread for querying the DB, run several threads for each Url to download/analyse and then exchange messages between these threads and the querying thread, with the extracted data as a parameter: does it make sense or am I totally wrong? I also read things about the connection pool concept but I'm not sure if it applies when only one connection is allowed at one time?
To start at the end – No, there’s no need for connection pool if there’s only one connection to the database. Even if there were more database writers, those would be long-term task and as such each would establish its own connection to the database. Connection pool concept is more appropriate for situations where there’s plenty of short-term tasks that each require its own database connection (a situation that typically occurs in servers that respond to client requests).
My first idea was to create a pipeline with two stages – multiple http retrievers in the first stage and one database writer in the second stage. Plenty of tasks could be accessing web at the same time - maybe even more then there are cores as those tasks would mostly block while accessing the web.
First stage, Retriever, fetches contents of one page. If the page is fetched correctly, page description object (not shown in this demo) is created and sent to the output pipeline. As there can be at most one output generated for each input, this stage is implemented as a simple stage.
Inserter is implemented as a normal stage (so it has to loop internally over all input data). First it establishes a connection to the database, then it loops over all input values (over all successfully retrieved pages) and inserts each result into the database, and at the end it closes the database connection.
Main method (ParallelWebRetriever) first sets up and starts the pipeline. Next it feeds URLs to be retrieved into the input pipeline and closes the input pipeline. At the end it waits for the pipeline to complete.
uses OtlCommon, OtlCollections, OtlParallel; function HttpGet(url: string; var page: string): boolean; begin // retrieve page contents from the url; return False if page is not accessible end; procedure Retriever(const input: TOmniValue; var output: TOmniValue); var pageContents: string; begin if HttpGet(input.AsString, pageContents) then output := TPage.Create(input.AsString, pageContents); end; procedure Inserter(const input, output: IOmniBlockingCollection); var page : TOmniValue; pageObj: TPage; begin // connect to database for page in input do begin pageObj := TPage(page.AsObject); // insert pageObj into database FreeAndNil(pageObj); end; // close database connection end; procedure ParallelWebRetriever; var pipeline: IOmniPipeline; s : string; urlList : TStringList; begin // set up pipeline pipeline := Parallel.Pipeline .Stage(Retriever).NumTasks(Environment.Process.Affinity.Count * 2) .Stage(Inserter) .Run; // insert URLs to be retrieved for s in urlList do pipeline.Input.Add(s); pipeline.Input.CompleteAdding; // wait for pipeline to complete pipeline.WaitFor(INFINITE); end;
Low-level solution is put together in a similar manner – there are multiple retriever tasks and one database inserter task. Of course, there’s much more code here.
An interesting trick, which is commonly used in the OtlParallel unit, is the use of a IOmniResourceCount to detect last running retriever task (which must close the output pipe so that the database inserter would know that it has to stop).
uses OtlCommon, OtlSync, OtlTask, OtlTaskControl, OtlCollections; function HttpGet(url: string; var page: string): boolean; begin // retrieve page contents from the url; return False if page is not accessible end; procedure Retriever(const task: IOmniTask); var input : IOmniBlockingCollection; output : IOmniBlockingCollection; pageContents: string; param : TOmniValue; taskCount : IOmniResourceCount; url : TOmniValue; begin param := task.Param['Input']; input := param.AsInterface as IOmniBlockingCollection; param := task.Param['Output']; output := param.AsInterface as IOmniBlockingCollection; param := task.Param['TaskCount']; taskCount := param.AsInterface as IOmniResourceCount; for url in input do begin if HttpGet(url, pageContents) then output.Add(TPage.Create(url.AsString, pageContents)); end; if taskCount.Allocate = 0 then output.CompleteAdding; end; procedure Inserter(const task: IOmniTask); var input : IOmniBlockingCollection; page : TOmniValue; pageObj: TPage; param : TOmniValue; begin param := task.Param['Input']; input := param.AsInterface as IOmniBlockingCollection; // connect to database for page in input do begin pageObj := TPage(page.AsObject); // insert pageObj into database FreeAndNil(pageObj); end; // close database connection end; procedure ParallelWebRetriever; var input : IOmniBlockingCollection; inserterTask : IOmniTaskControl; iTask : integer; numRetrievers : integer; retrieverCount: IOmniResourceCount; s : string; sqlInput : IOmniBlockingCollection; begin numRetrievers := Environment.Process.Affinity.Count * 2; input := TOmniBlockingCollection.Create; sqlInput := TOmniBlockingCollection.Create; // run inserter inserterTask := CreateTask(Inserter, 'Inserter task') .SetParameter('Input', sqlInput) .Run; retrieverCount := CreateResourceCount(numRetrievers); // run retrievers for iTask := 1 to numRetrievers do begin CreateTask(Retriever, Format('Retriever task #%d', [iTask])) .SetParameter('Input', input) .SetParameter('Output', sqlInput) .SetParameter('TaskCount', retrieverCount) .Unobserved .Run; end; // insert URLs to be retrieved for s in urlList do input.Add(s); input.CompleteAdding; // wait for pipeline to complete inserterTask.WaitFor(INFINITE); inserterTask.Terminate; end;