Currently viewing: GipsySoft » Front Page» Articles

Multithreaded queue template

The problem we had was one of adding a lot of records into a SQL Server database. We were doing some heavy pre-processing on each record prior to inserting the data and each insert took a relatively long time but neither the server nor the client was CPU bound.

The suggestion of having multiple threads doing the insert was brought up and we tried it - with a reduction of overall processing time of around 60-70%. Quite a saving!

I'd used the code contained in this template class before but it was specialised for that solution. I decided to have a hack to see if I could coerce it into a useful template queue. The results are quite useful.

Description

The class is quite flexible. It allows you to create a simple class that will handle job processing (job handler). You're class will be told when the thread starts and when it finishes, and of course when to process a job. By having a class we have the flexibility to create resources prior to processing items and we can release them once all processing is done.

The class does not need any syncronisation to process the queue and would look something like this:


class CObjectJobHandler
{
public:
	bool OnThreadStart()
	{
		printf( "OnThreadStart\n" );
		return true;
	}

	bool OnThreadStop()
	{
		printf( "OnThreadStop\n" );
		return true;
	}

	bool OnProcessItem( CObject *pObject )
	{
		printf( "Processing object\n" );
		delete pObject;
		return true;
	}
};

By default the queue will manage creation and destruction of job handlers but you can override it's default behaviour

Sometimes we want control over how a handler is created. For example; we may want to pass construction parameters to it or we may only want one job handler for all threads. An example of only using a job handling is like this:


class CObjectJobHandlerManager
{
public:
	CObjectJobHandlerManager()
	{
		printf("CIntManager created\n");
	}

	CObjectJobHandler *Create()
	{
		return &m_jobhandler;
	}

	void Destroy( CObject *p )
	{
		//	Do nothing
	}
	CObjectJobHandler m_jobhandler;
};

Also, if you are adding newly created items into the queue m_queue.AddJob( new CObject ); you'll want to have some way of freeing the items that go unprocessed if the queue is cancelled. For this we provide a template function like this:


namespace GSMT {
	void ProcessUnprocessedJobs( std::vector< CObject * > &arrItem )
	{
		for( UINT u = 0; u < arrItem.size(); u++ )
		{
			delete arrItem[ u ];
		}
	}
}
This gives you full control over creation and destruction of queued jobs.

When the queue goes out of scope it will end all processing. This can mean jobs will not be finished, if this isn't what you want then you shouldn't let the queue finish until there are no more jobs on the queue.

Typical usage

Below is a tyipcal and simplistic usage. You can see more in file MTQueueTest.cpp file in the ZIP file. The MTQueueTest.cpp demonstrates using all of the methods as well as some tests to ensure the class is type neutral.

class CQueueObjectString
{
public:
	bool OnThreadStart()
	{
		printf( "OnThreadStart\n" );
		return true;
	}

	bool OnThreadStop()
	{
		printf( "OnThreadStop\n" );
		return true;
	}

	bool OnProcessItem( LPCTSTR pcsz )
	{
		printf( "Processing %s\n", pcsz );
		return true;
	}
};

void main()
{
	GSMT::CQueue< CQueueObjectString, LPCTSTR > m_queue( 5 );

	m_queue.AddJob( "string 1" );

	while( m_queue.GetWaitingJobsCount() )
	{
		printf( "Waiting for jobs to finish\n");
		if( _kbhit() && _getch() == 27 )
		{
			m_queue.Stop();
			break;
		}
		Sleep( 1000 );
	}
}

Licence

This class uses the LGPL.