A “Entity Bulk-Compare-Insert” method I wrote for copying a large amount of data

Just sharing a class I created which performs a copy on a couple million rows.

Instead of using a unique index on the destination database, I have a simple hashset to perform a quick lookup on each row to ensure it is not in the database.

Dispose statements are used quite a bit… without calling dispose on the entities and task items, we will consume memory pretty quickly, and…. crash.

Average copy times (Judged from network traffic on the SQL Server) are just as fast as a bulkcopy, with far less CPU usage on the sql server (No index calculation)

However, for the machine which will run this workflow, it will consume on average 1GB of memory, and… it will make the CPU actually have to do some work.

Just sharing, feel free to critique / comment.

public class SLO_Import_SCOM_Data_ALL { /// <summary> /// This method will bulk copy all state data into the SLO database. /// Note: Object ID must be in SLO_objects table in order for it to be copied. /// </summary> /// private ILogger log;

 public SLO_Import_SCOM_Data_ALL(ILogger ILOG) { log = ILOG; log.SetLogName("SLO", "Import-SCOMData"); } public void Start() { log.Info("Starting workflow."); var timer = System.Diagnostics.Stopwatch.StartNew(); using (var context = new SLODB()) using (var conSCOM = new SqlConnection(System.Configuration.ConfigurationManager.ConnectionStrings["SCOM"].ConnectionString)) { context.Configuration.AutoDetectChangesEnabled = false; context.Configuration.ProxyCreationEnabled = false; context.Configuration.LazyLoadingEnabled = false; conSCOM.Open(); var query_StateData = @" 

SELECT sv.BaseManagedEntityId as ID, sv.OldHealthState as OldState, sv.NewHealthState as NewState, sv.TimeGenerated as TimeGenerated, M.MonitorCategory as MonitorCategory, M.MonitorName as MonitorName, M.MonitorId as MonitorId, sv.StateChangeEventId as StateChangeEventId FROM StateChangeEventView sv JOIN Monitor M on sv.MonitorId = m.MonitorId ORDER BY ID, TimeGenerated ";

 SqlCommand cmd_StateData = new SqlCommand(query_StateData, conSCOM); HashSet<Guid> Exists = new HashSet<Guid>(context.SCOM_StateData_RAW.AsNoTracking().Select(o => o.StateChangeEventId)); List<SCOM_StateData_RAW> records = new List<SCOM_StateData_RAW>(); List<Task> SaveTasks = new List<Task>(); int TotalRows = 0; int CopiedRows = 0; log.Info("Executing reader."); using (SqlDataReader rdr = cmd_StateData.ExecuteReader()) { log.Info("Copying results."); while (rdr.Read()) { var SCOMRowID = (Guid)rdr["StateChangeEventId"]; TotalRows++; if (!Exists.Contains(SCOMRowID)) { records.Add(new SCOM_StateData_RAW() { ID = (Guid)rdr["ID"], OldState = (byte)rdr["OldState"], NewState = (byte)rdr["NewState"], TimeGenerated = ((DateTime)rdr["TimeGenerated"]).ToLocalTime(), MonitorCategory = (string)rdr["MonitorCategory"], MonitorName = (string)rdr["MonitorName"], MonitorId = (Guid)rdr["MonitorId"], StateChangeEventId = SCOMRowID }); CopiedRows++; } if (records.Count > 10000) { log.Info("flushing 10,000 records to save queue."); SaveTasks.Add(UpdateContext(records)); records = new List<SCOM_StateData_RAW>(); var CompletedTasks = SaveTasks .Where(o => o.Status == TaskStatus.RanToCompletion) .ToList(); log.Info("Disposing of {0} completed tasks.", null, CompletedTasks.Count()); CompletedTasks.ForEach(o => { SaveTasks.Remove(o); o.Dispose(); }); int runningTasks = SaveTasks.Where(o => o.Status == TaskStatus.Running | o.Status == TaskStatus.WaitingToRun | o.Status == TaskStatus.WaitingForActivation).Count(); while (runningTasks > 10) { Console.WriteLine("Reached thread limit. Waiting for existing threads to complete."); System.Threading.Thread.Sleep(1000); runningTasks = SaveTasks.Where(o => o.Status == TaskStatus.Running | o.Status == TaskStatus.WaitingToRun).Count(); } } } } //Flush remaining records. SaveTasks.Add(UpdateContext(records)); log.Info("Waiting on async tasks to complete."); Task.WaitAll(SaveTasks.ToArray()); log.Info("Tasks completed."); context.Settings.Find("LastSync").TimeStamp = DateTime.Now; context.SaveChanges(); int SkippedRows = TotalRows - CopiedRows; timer.Stop(); log.Success("Copied {1} out of {2} total rows from SCOM to SLODB in {0} seconds.", null, timer.Elapsed.TotalSeconds, CopiedRows, TotalRows); } } private async Task<bool> UpdateContext(List<SCOM_StateData_RAW> records) { var context = new SLODB(); context.Set<SCOM_StateData_RAW>().AddRange(records); await context.SaveChangesAsync(); context.Dispose(); return true; } } 

by XtremeOwnage via /r/csharp

Leave a Reply