Fastest Way to Insert in Parallel to a Single Table

Fastest way to insert in parallel to a single table

Have you read Load 1TB in less than 1 hour?

  1. Run as many load processes as you have available CPUs. If you have
    32 CPUs, run 32 parallel loads. If you have 8 CPUs, run 8 parallel
    loads.
  2. If you have control over the creation of your input files, make them
    of a size that is evenly divisible by the number of load threads you
    want to run in parallel. Also make sure all records belong to one
    partition if you want to use the switch partition strategy.
  3. Use BULK insert instead of BCP if you are running the process on the
    SQL Server machine.
  4. Use table partitioning to gain another 8-10%, but only if your input
    files are GUARANTEED to match your partitioning function, meaning
    that all records in one file must be in the same partition.
  5. Use TABLOCK to avoid row at a time locking.
  6. Use ROWS PER BATCH = 2500, or something near this if you are
    importing multiple streams into one table.

For SQL Server 2008, there are certain circumstances where you can utilize minimal logging for a standard INSERT SELECT:

SQL Server 2008 enhances the methods that it can handle with minimal
logging. It supports minimally logged regular INSERT SELECT
statements. In addition, turning on trace flag 610 lets SQL Server
2008 support minimal logging against a nonempty B-tree for new key
ranges that cause allocations of new pages.

How to make an insert correctly in parallel loop

The slow part of your process is the reading of the data in the file. Regularly your program would have to wait idly for the "hard disk" to provide a chunk of data. Instead of waiting idly, your program could already do some processing of the already fetched items.

Whenever you have a program where your process has to wait for some external process, like writing to a disk, querying data from a database management system, or fetching information from the internet, it is wise to consider to use async-await.

If you use async-await, then, whenever your process has to wait for some other process to finish, it won't wait idly, but will look around to see if it can do something else instead.

In your case, you could call an async function that async reads one file and async writes the read data to the database. If you start several of these Tasks, then whenever one of these Tasks has to await for the result from either the file reading or the database writing, it can look around to see if it can do anything for the other Tasks. So while it is waiting for a chunk of data from reading file X in Task A, it could already start writing data to the database in Task B.

As we are processing the file line by line, we need a function that returns an IEnumerable<string>, or the async equivalent: IAsyncEnumable<string>. See iterating with AsyncEnumerable

public async IAsyncEnumerable<string> ReadLines(string fileName)
{
using (StreamReader reader = File.OpenText(fileName)
{
while(!reader.EndOfStream)
{
yield return await reader.ReadLineAsync().ConfigureAwait(false);
}
}
}

File.OpenText sadly only allows synchronous I/O; the async APIs are implemented poorly in that scenario. To open a true asynchronous file, you'd need to use one of the overloads of the FileStream constructors that have a Boolean parameter isAsync or FileOptions.Asynchronous.

Usage:

async Task DisplayFileContentsAsync(string fileName)
{
await foreach(string line in ReadFileAsync(fileName))
{
Console.WriteLine(line);
}
}

We also need a method that writes the read data to the database. I do it here line by line, if you want, you could change it such that is writes several lines at once.

async Task SaveInDbAsync(string line, string dbConnectionString)
{
using (SqlConnection dbConnection = new SqlConnection(dbConnectionString))
{
// prepare the SQL command (consider using other methods)
const string sqlCommandText = @"Insert into ...";
var dbCommand = dbConnection.CreateCommand();
dbCommand.CommandText = sqlCommandText;
dbCommand.Parameters.Add(...)

// async execute the dbCommand
await dbConnection.OpenAsync();
await dbCommand.ExecuteNonQueryAsync();
// TODO: consider to use the return value to detect problems
}
}

Put it all together: read one file and save the lines in the database:

async Task SaveFileInDbAsync(string fileName, string dbConnectionString)
{
await foreach(string line in ReadFileAsync(fileName))
{
await SaveInDbAsync(line, dbConnectionString);
}
}

To save all your files:

async Task SaveFilesInDbAsync(IEnumerable<string> fileNames, string dbConnectionString)
{
// start all Tasks, do not await yet:
List<Task> tasks = new List<Task>();
foreach (string fileName in fileNames)
{
Task task = SaveFileInDbAsync(fileName, dbConnectionString);
tasks.Add(task);
}

// now that all Tasks are started and happily reading files
// and writing the read lines to the database
// await until all Tasks are finished.
await Task.WhenAll(tasks);
}

Or if you need a synchronous version:

void SaveFilesInDb(IEnumerable<string> fileNames, string dbConnectionString)
{
// start all Tasks, do not await yet:
List<Task> tasks = new List<Task>();
foreach (string fileName in fileNames)
{
Task task = SaveFileInDbAsync(fileName, dbConnectionString);
tasks.Add(task);
}

// Wait until all Tasks are finished.
Task.WaitAll(tasks);
}

Importing Data in Parallel in SQL Server

  1. Load the file path details into a tracking table

    Create table FileListCollection TABLE (Id int identity(1,1), filepath VARCHAR(500), ThreadNo tinyint, isLoaded int)

    DECLARE @FileListCollection TABLE (filepath VARCHAR(500))
    DECLARE @folderpath NVARCHAR(500)
    DECLARE @cmd NVARCHAR(100)
    SET @folderpath = '<FolderPath>'
    SET @cmd = 'dir ' + @folderpath + ' /b /s'

    INSERT INTO @FileListCollection
    EXECUTE xp_cmdshell @cmd

    DELETE
    FROM @FileListCollection
    WHERE filepath IS NULL

    insert into FileListCollection(filepath, isLoaded)
    select filepath, 0
    from @FileListCollection
  2. Schedule for each thread

    declare @ThreadNo int = 3
    update f set ThreadNo=(id%@ThreadNo)
    from FileListCollection f
  3. Open three session and assign thread number to each

  4. Run the below script to load the data

    DECLARE @filepath NVARCHAR(500)
    DECLARE @filepath NVARCHAR(500)
    DECLARE @bcpquery NVARCHAR(MAX);
    DECLARE @ThreadNo int = 1
    WHILE EXISTS (
    SELECT TOP 1 *
    FROM FileListCollection
    where ThreadNo = @ThreadNo
    and isLoaded = 0
    )
    BEGIN
    SELECT TOP 1 @filepath = filepath
    FROM FileListCollection
    where ThreadNo = @ThreadNo
    and isLoaded = 0

    SET @bcpquery = 'bulk insert <Database>.dbo.Table from '''+ @filepath+''' with (fieldterminator = ''|'', rowterminator = ''\n'')';

    print @bcpquery
    --Load the Content in table
    execute sp_executesql @bcpquery;

    Update FileListCollection set isLoaded = 1
    WHERE filepath = @filepath
    END

how to get postgres to do parallel inserts with cte query?

There is no way to parallelize INSERTs in PostgreSQL, except by opening several database connections and using them in parallel.

how to insert data parellel in three different tables

Statements execute synchronously within a T-SQL batch. To execute multiple statements asynchronously and in parallel from a stored procedure, you'll need to use multiple concurrent database connections. Note that the tricky part with asynchronous execution is determining not only when all the tasks have completed, but also whether they have succeeded or failed.

Method 1: SSIS package

Create an SSIS package to execute the 3 SQL statements in parallel. In SQL 2012 and later, run the package using SSIS catalog stored procedures. Pre-SQL 2012, you'll need to create a SQL Agent job for the package and launch with sp_start_job.

You'll need to check the SSIS execution status or SQL Agent job status to determine completion, and success/failure result.

Method 2: Powershell and SQL Agent

Execute a SQL Agent job that runs a Powershell script that executes the queries in parallel using Powershell background jobs (Start-Job command). The script can return an exit code, zero for success and non-zero for failure, so that SQL Agent can determine if it succeeded. Check SQL Agent job status to determine completion, and success/failure result.

Method 3: Multiple SQL Agent jobs

Execute multiple SQL Agent jobs concurrently, each with a T-SQL job step containting the import script. Check SQL Agent job status of each job to determine completion, and success/failure result.

Method 4: Service Broker
Use a queue activated proc to execute the import scripts in parallel. This can be obtuse if you haven't used Service broker before and it is important to follow vetted patterns. I've included an example to get you started (replace THROW with RAISERROR for pre-SQL 2012). The database must have Service Broker enabled, which is enabled by default but turned off following a restore or attach.

USE YourDatabase;
Go

--create proc that will be automatically executed (activated) when requests are waiting
CREATE PROC dbo.ExecuteTSqlTask
AS
SET NOCOUNT ON;

DECLARE
@TSqlJobConversationHandle uniqueidentifier = NEWID()
, @TSqlExecutionRequestMessage xml
, @TSqlExecutionResultMessage xml
, @TSqlExecutionResult varchar(10)
, @TSqlExecutionResultDetails nvarchar(MAX)
, @TSqlScript nvarchar(MAX)
, @TSqlTaskName sysname
, @RowsAffected int
, @message_type_name sysname;

WHILE 1 = 1
BEGIN

--get the next task to execute
WAITFOR (
RECEIVE TOP (1)
@TSqlJobConversationHandle = conversation_handle
, @TSqlExecutionRequestMessage = CAST(message_body AS xml)
, @message_type_name = message_type_name
FROM dbo.TSqlExecutionQueue
), TIMEOUT 1000;

IF @@ROWCOUNT = 0
BEGIN
--no work to do - exit
BREAK;
END;

IF @message_type_name = N'TSqlExecutionRequest'
BEGIN

--get task name and script
SELECT
@TSqlTaskName = @TSqlExecutionRequestMessage.value('(/TSqlTaskName)[1]', 'sysname')
, @TSqlScript = @TSqlExecutionRequestMessage.value('(/TSqlScript)[1]', 'nvarchar(MAX)');

--execute script
BEGIN TRY
EXEC sp_executesql @TSqlScript;
SET @RowsAffected = @@ROWCOUNT;
SET @TSqlExecutionResult = 'Completed';
SET @TSqlExecutionResultDetails = CAST(@RowsAffected as varchar(10)) + ' rows affected';
END TRY
BEGIN CATCH
SET @TSqlExecutionResult = 'Erred';
SET @TSqlExecutionResultDetails =
'Msg ' + CAST(ERROR_NUMBER() AS varchar(10))
+ ', Level ' + CAST(ERROR_SEVERITY() AS varchar(2))
+ ', State ' + CAST(ERROR_STATE() AS varchar(10))
+ ', Line ' + CAST(ERROR_LINE() AS varchar(10))
+ ': ' + ERROR_MESSAGE();
END CATCH;

--send execution result back to initiator
SET @TSqlExecutionResultMessage = '<TSqlTaskName /><TSqlExecutionResult /><TSqlExecutionResultDetails />';
SET @TSqlExecutionResultMessage.modify('insert text {sql:variable("@TSqlTaskName")} into (/TSqlTaskName)[1] ');
SET @TSqlExecutionResultMessage.modify('insert text {sql:variable("@TSqlExecutionResult")} into (/TSqlExecutionResult)[1] ');
SET @TSqlExecutionResultMessage.modify('insert text {sql:variable("@TSqlExecutionResultDetails")} into (/TSqlExecutionResultDetails)[1] ');
SEND ON CONVERSATION @TSqlJobConversationHandle
MESSAGE TYPE TSqlExecutionResult
(@TSqlExecutionResultMessage);

END
ELSE
BEGIN
IF @message_type_name = N'TSqlJobComplete'
BEGIN
--service has ended conversation so we're not going to get any more execution requests
END CONVERSATION @TSqlJobConversationHandle;
END
ELSE
BEGIN
END CONVERSATION @TSqlJobConversationHandle WITH ERROR = 1 DESCRIPTION = 'Unexpected message type received by ExecuteTSqlTask';
RAISERROR('Unexpected message type received (%s) by ExecuteTSqlTask', 16, 1, @message_type_name);
END;
END;
END;
GO

CREATE QUEUE dbo.TSqlResultQueue;
CREATE QUEUE dbo.TSqlExecutionQueue
WITH STATUS=ON,
ACTIVATION (
STATUS = ON
, PROCEDURE_NAME = dbo.ExecuteTSqlTask
, MAX_QUEUE_READERS = 3 --max number of concurrent activated proc instances
, EXECUTE AS OWNER
);
CREATE MESSAGE TYPE TSqlExecutionRequest VALIDATION = WELL_FORMED_XML;
CREATE MESSAGE TYPE TSqlExecutionResult VALIDATION = WELL_FORMED_XML;
CREATE MESSAGE TYPE TSqlJobComplete VALIDATION = WELL_FORMED_XML;
CREATE CONTRACT TSqlExecutionContract (
TSqlExecutionRequest SENT BY INITIATOR
, TSqlJobComplete SENT BY INITIATOR
, TSqlExecutionResult SENT BY TARGET
);
CREATE SERVICE TSqlJobService ON QUEUE dbo.TSqlResultQueue ([TSqlExecutionContract]);
CREATE SERVICE TSqlExecutorService ON QUEUE dbo.TSqlExecutionQueue ([TSqlExecutionContract]);
GO

CREATE PROC dbo.ExecuteParallelImportScripts
AS
SET NOCOUNT ON;

DECLARE
@TSqlJobConversationHandle uniqueidentifier
, @TSqlExecutionRequestMessage xml
, @TSqlExecutionResultMessage xml
, @TSqlExecutionResult varchar(10)
, @TSqlExecutionResultDetails nvarchar(MAX)
, @TSqlTaskName sysname
, @CompletedCount int = 0
, @ErredCount int = 0
, @message_type_name sysname;

DECLARE @TsqlTask TABLE(
TSqlTaskName sysname NOT NULL PRIMARY KEY
, TSqlScript nvarchar(MAX) NOT NULL
);

BEGIN TRY

--insert a row for each import task
INSERT INTO @TsqlTask(TSqlTaskName, TSqlScript)
VALUES(N'ImportScript1', N'INSERT INTO dbo.Table1 SELECT * FROM dbo.Table1Staging;');
INSERT INTO @TsqlTask(TSqlTaskName, TSqlScript)
VALUES(N'ImportScript2', N'INSERT INTO dbo.Table2 SELECT * FROM dbo.Table2Staging;');
INSERT INTO @TsqlTask(TSqlTaskName, TSqlScript)
VALUES(N'ImportScript3', N'INSERT INTO dbo.Table3 SELECT * FROM dbo.Table3Staging;');

--start a conversation for this import process
BEGIN DIALOG CONVERSATION @TsqlJobConversationHandle
FROM SERVICE TSqlJobService
TO SERVICE 'TSqlExecutorService', 'CURRENT DATABASE'
ON CONTRACT TSqlExecutionContract
WITH ENCRYPTION = OFF;

--send import tasks to executor service for parallel execution
DECLARE JobTasks CURSOR LOCAL FAST_FORWARD FOR
SELECT (SELECT TSqlTaskName, TSqlScript
FROM @TsqlTask AS task
WHERE task.TSqlTaskName = job.TSqlTaskName
FOR XML PATH(''), TYPE) AS TSqlExecutionRequest
FROM @TsqlTask AS job;
OPEN JobTasks;
WHILE 1 = 1
BEGIN
FETCH NEXT FROM JobTasks INTO @TSqlExecutionRequestMessage;
IF @@FETCH_STATUS = -1 BREAK;
SEND ON CONVERSATION @TSqlJobConversationHandle
MESSAGE TYPE TSqlExecutionRequest
(@TSqlExecutionRequestMessage);
END;
CLOSE JobTasks;
DEALLOCATE JobTasks;

--get each parallel task execution result until all are complete
WHILE 1 = 1
BEGIN

--get next task result
WAITFOR (
RECEIVE TOP (1)
@TSqlExecutionResultMessage = CAST(message_body AS xml)
, @message_type_name = message_type_name
FROM dbo.TSqlResultQueue
WHERE conversation_handle = @TSqlJobConversationHandle
), TIMEOUT 1000;

IF @@ROWCOUNT <> 0
BEGIN

IF @message_type_name = N'TSqlExecutionResult'
BEGIN

--get result of import script execution
SELECT
@TSqlTaskName = @TSqlExecutionResultMessage.value('(/TSqlTaskName)[1]', 'sysname')
, @TSqlExecutionResult = @TSqlExecutionResultMessage.value('(/TSqlExecutionResult)[1]', 'varchar(10)')
, @TSqlExecutionResultDetails = COALESCE(@TSqlExecutionResultMessage.value('(/TSqlExecutionResultDetails)[1]', 'nvarchar(MAX)'), N'');
RAISERROR('Import task %s %s: %s', 0, 0, @TSqlTaskName, @TSqlExecutionResult, @TSqlExecutionResultDetails) WITH NOWAIT;
IF @TSqlExecutionResult = 'Completed'
BEGIN
SET @CompletedCount += 1;
END
ELSE
BEGIN
SET @ErredCount += 1;
END;

--remove task from tracking table after completion
DELETE FROM @TSqlTask
WHERE TSqlTaskName = @TSqlTaskName;

IF NOT EXISTS(SELECT 1 FROM @TsqlTask)
BEGIN
--all tasks are done - send TSqlJobComplete message to instruct executor service to end conversation
SEND ON CONVERSATION @TSqlJobConversationHandle
MESSAGE TYPE TSqlJobComplete;
END
END
ELSE
BEGIN
IF @message_type_name = N'http://schemas.microsoft.com/SQL/ServiceBroker/EndDialog'
BEGIN
--executor service has ended conversation so we're done
END CONVERSATION @TSqlJobConversationHandle;
BREAK;
END
ELSE
BEGIN
END CONVERSATION @TSqlJobConversationHandle WITH ERROR = 1 DESCRIPTION = 'Unexpected message type received by ExecuteParallelInserts';
RAISERROR('Unexpected message type received (%s) by ExecuteParallelInserts', 16, 1, @message_type_name);
END;
END
END;
END;
RAISERROR('Import processing completed. CompletedCount=%d, ErredCount=%d.', 0, 0, @CompletedCount, @ErredCount);
END TRY
BEGIN CATCH
THROW;
END CATCH;
GO

--execute import scripts in parallel
EXEC dbo.ExecuteParallelImportScripts;
GO

Block parallel inserts in a TABLE

Thanks @Mitch Wheat on the XY problem. I have narrowed on what i needed to do.

The load_id_by_int (formerly load_id_by_date) is now generated from a bigint representation of NEWID(). The chances of conflict is now acceptable (at least in my opinion). Thanks for the assistance everyone who commented.

CREATE TABLE my_table
(
load_id uniqueidentifier NOT NULL,
load_date datetime NOT NULL DEFAULT (GETDATE()),
load_id_by_int bigint NOT NULL DEFAULT (ABS(convert(bigint, convert (varbinary(8), NEWID(), 1)))),
is_processed bit DEFAULT(0)
PRIMARY KEY (load_id_by_int)
)

The concept was derived from Convert from UniqueIdentifier to BigInt and Back?

How to do very fast inserts to SQL Server 2008

ExecuteNonQuery with an INSERT statement, or even a stored procedure, will get you into thousands of inserts per second range on Express. 4000-5000/sec are easily achievable, I know this for a fact.

What usually slows down individual updates is the wait time for log flush and you need to account for that. The easiest solution is to simply batch commit. Eg. commit every 1000 inserts, or every second. This will fill up the log pages and will amortize the cost of log flush wait over all the inserts in a transaction.

With batch commits you'll probably bottleneck on disk log write performance, which there is nothing you can do about it short of changing the hardware (going raid 0 stripe on log).

If you hit earlier bottlenecks (unlikely) then you can look into batching statements, ie. send one single T-SQL batch with multiple inserts on it. But this seldom pays off.

Of course, you'll need to reduce the size of your writes to a minimum, meaning reduce the width of your table to the minimally needed columns, eliminate non-clustered indexes, eliminate unneeded constraints. If possible, use a Heap instead of a clustered index, since Heap inserts are significantly faster than clustered index ones.

There is little need to use the fast insert interface (ie. SqlBulkCopy). Using ordinary INSERTS and ExecuteNoQuery on batch commits you'll exhaust the drive sequential write throughput much faster than the need to deploy bulk insert. Bulk insert is needed on fast SAN connected machines, and you mention Express so it's probably not the case. There is a perception of the contrary out there, but is simply because people don't realize that bulk insert gives them batch commit, and its the batch commit that speeds thinks up, not the bulk insert.

As with any performance test, make sure you eliminate randomness, and preallocate the database and the log, you don't want to hit db or log growth event during test measurements or during production, that is sooo amateurish.



Related Topics



Leave a reply



Submit