Multiprocessing with ArcGIS – Approaches and Considerations (Part 1)

The multiprocessing Python module provides functionality for distributing work between multiple processes on a given machine, taking advantage of multiple CPU cores and larger amounts of available system memory. When analyzing or working with large amounts of data in ArcGIS, there are scenarios where multiprocessing can improve performance and scalability. However, there are many cases where multiprocessing can negatively affect performance, and even some instances where it should not be used.
There are two approaches to using multiprocessing for improving performance or scalability:

  1. Processing many individual datasets.
  2. Processing datasets with many features.

The goal of this article is to share simple coding patterns for effectively performing multiprocessing for geoprocessing. The article will cover relevant considerations and limitations, which are important when attempting to implement this approach.

1. Processing large numbers of datasets

The first example covers performing a specific operation on a large number of datasets, in a workspace or set of workspaces. In cases where there are large numbers of datasets, taking advantage of multiprocessing can help get the job done faster. The following code demonstrates a multiprocessing module used to define a projection, add a field, and calculate the field for a large list of shapefiles. This Python code is a simple pattern, which will create a pool of processes equal to the number of CPUs or CPU cores available. This pool of processes will then be used to processes the feature classes.

import os
import re
import multiprocessing
import arcpy

def update_shapefiles(shapefile):
    '''Worker function'''

    # Define the projection to wgs84 -- factory code is 4326.
    arcpy.management.DefineProjection(shapefile, 4326)

    # Add a field named CITY of type TEXT.
    arcpy.management.AddField(shapefile, 'CITY', 'TEXT')

    # Calculate field 'CITY' stripping '_base' from
    # the shapefile name.
    city_name = shapefile.split('_base')[0]
    city_name = re.sub('_', ' ', city_name)
    arcpy.management.CalculateField(shapefile, 'CITY',
                    '"{0}"'.format(city_name.upper()), 'PYTHON')

# End update_shapefiles
def main():
    ''' Create a pool class and run the jobs.'''
    # The number of jobs is equal to the number of shapefiles
    workspace = 'C:/GISData/USA/usa'
    arcpy.env.workspace = workspace
    fcs = arcpy.ListFeatureClasses('*')
    fc_list = [os.path.join(workspace, fc) for fc in fcs]
    pool = multiprocessing.Pool()
    pool.map(update_shapefiles, fc_list)

    # Synchronize the main process with the job processes to
    # ensure proper cleanup.
    pool.close()
    pool.join()
    # End main

if __name__ == '__main__':
    main()

2. Processing a individual dataset with a lot of features and records

This second example looks at Geoprocessing tools analyzing on an individual dataset with a lot of features and records. In this situation we can benefit from multiprocessing, by splitting data into groups to be processed simultaneously.

Figure 1: A fishnet of polygons covering the extent of one million points

For example, finding identical features may be faster when you split a large feature class into groups, based on spatial extents.
The following code uses a pre-defined fishnet of polygons covering the extent of 1 million points (Figure 1).

import multiprocessing
import numpy
import arcpy
arcpy.env.overwriteOutput = True

def find_identical(oid):
    '''Worker function to perform Find Identical, and return'''
    '''a list of numpy arrays as the results.'''
    # Create a feature layer for the tile in the fishnet.
    tile = arcpy.management.MakeFeatureLayer(
                        'C:/testing/testing.gdb/fishnet',
                        'layer{0}'.format(oid[0]), 
                        """OID = {0}""".format((oid[0])))

    # Get the extent of the feature layer and set the environment
    # to use during Find Identical.
    tile_row = arcpy.da.SearchCursor(tile, 'shape@')
    geometry = tile_row.next()[0]
    arcpy.env.extent = geometry.extent

    # Execute Find Identical
    identical_table = arcpy.management.FindIdentical(
                         'C:/testing/testing.gdb/Points', 
                         'in_memory/identical', 'Shape')

    # Convert the resulting table into a numpy array and return
    result_array = arcpy.da.TableToNumPyArray(identical_table, ["*"])
    return result_array
    # End find_identical

def main():
    # Create a list of OID's used to chunk the inputs
    fishnet_rows = arcpy.SearchCursor(
              'C:/testing/testing.gdb/fishnet', '', '', 'OID')
    oids = [[row.getValue('OID')] for row in fishnet_rows]

    # Create a pool class and run the jobs--
    # the number of jobs is equal to the length of the oids list
    pool = multiprocessing.Pool()
    result_arrays = pool.map(find_identical, oids)

    # Concatenate the resulting arrays and create an output table
    # reporting any identical records.
    result_array = numpy.concatenate(result_arrays,axis=0)
    arcpy.da.NumPyArrayToTable(result_array, 
                                'C:/testing/testing.gdb/identical')

    # Synchronize the main process with the job processes to ensure
    # proper cleanup.
    pool.close()
    pool.join()
    # End main

if __name__ == '__main__':
    main()

The example above splits the data using spatial extents. However, there are tools that do not require data be split spatially. The Generate Near Table example below, shows the data processed in groups of 250000 features by selecting them based on object ID ranges.

import multiprocessing
import numpy
import arcpy
def generate_near_table(ranges):
    i, j = ranges[0], ranges[1]
    lyr = arcpy.management.MakeFeatureLayer(
                 'c:/testing/testing.gdb/random1mil',
                  'layer{0}'.format(i), """OID >= {0} AND
                  OID <= {1}""".format((i, j)))
    gn_table = arcpy.analysis.GenerateNearTable(
                  lyr, 'c:/testing/testing.gdb/random300',
                  'in_memory/outnear{0}'.format(i))
    result_array = arcpy.da.TableToNumPyArray(gn_table, ["*"])
    arcpy.management.Delete(gn_table)
    return result_array
    # End generate_near_table function

def main():
    ranges = [[0, 250000], [250001, 500000], [500001, 750000],
                    [750001, 1000001]]
    # Create a pool class and run the jobs--the number of jobs is
    # equal to the length of the oids list
    pool = multiprocessing.Pool()
    result_arrays = pool.map(generate_near_table, ranges)

    # Concatenate the resulting arrays and create an output table
    # reporting any identical records.
    result_array = numpy.concatenate(result_arrays,axis=0)
    arcpy.da.NumPyArrayToTable(result_array, 'c:/testing/testing.gdb/gn3')

    # Synchronize the main process with the job processes to
    # Ensure proper cleanup.
    pool.close()
    pool.join()
    # End main

if __name__ == '__main__':
    main()

Considerations

Here are some important considerations before deciding to use multiprocessing:

  1. The scenario demonstrated in the first example, will not work with feature classes in a file geodatabase because each update must acquire a schema lock on the workspace. A schema lock effectively prevents any other process from simultaneously updating the FGDB. This example will however, work with shapefiles and ArcSDE geodatabase data.
  2. For each process, there is a start-up cost in loading the arcpy library (1-3 seconds). Depending on the complexity and size of the data, this can cause the multiprocessing script to take longer to run than a script without multiprocessing. In many cases, the final step in the multiprocessing workflow is to aggregate all results together which is an additional cost.
  3. Determining if multiprocessing is appropriate for your workflow can often be a trial and error process. This process can invalidate the gains made using multiprocessing in a one off operation; however, the trial and error process may be very valuable if the final workflow is to be run multiple times, or applied to similar workflows using large data. For example, if you are running the Find Identical tool on a weekly basis, and it is running for hours with your data, multiprocessing may be worth the effort.
  4. Whenever possible, take advantage of the “in_memory” workspace for creating temporary data. It can improve performance rather than writing data to disk. However, depending on the size of data being created in-memory, it may be necessary to write temporary data to disk. Temporary datasets cannot be created in a file geodatabase because of schema locking. Deleting the in-memory dataset when you are finished can prevent out of memory errors.

Summary

    These are just a few examples showing how multiprocessing can be used to increase performance and scalability when doing geoprocessing. However, it is important to remember that multiprocessing does not always mean better performance.
    The multiprocessing module was included in Python 2.6 and the examples above will work in ArcGIS 10.1. For more information about the multiprocessing module, refer the Python documentation,http://docs.python.org/library/multiprocessing.html
    Please provide any feedback and comments to this blog posting, and stay tuned for another posting coming soon about “Being successful processing large complex data with the geoprocessing overlay tools”.
    This post was contributed by Jason Pardy, a product engineer on the analysis and geoprocessing team.
This entry was posted in Analysis & Geoprocessing, Python and tagged , , , , , . Bookmark the permalink.

Leave a Reply

7 Comments

  1. offermann says:

    Nice article about the considerations of using the power of multiple CPUs, especially the dependecies to the underlying workspace type (no fgdb). I some times have problems when it comes to geoprocessing of many features in a feature class. It would be helpful to write an article of how to separate one single feature class to many smaller feature classes, or an automatically created “fishnet” feature class like in the second example.

    • bruce_harold says:

      Hi

      Here is a code snippet I use to break tables or feature classes up into chunks of arbitrary count, by creating a SQL expression usable for cursors, feature sets etc. It creates a list of expressions for ObjectID ranges defining the chunks.

      inDesc = arcpy.Describe(inTable)
      oidName = arcpy.AddFieldDelimiters(inTable,inDesc.oidFieldName)
      sql = ‘%s = (select min(%s) from %s)’ % (oidName,oidName,os.path.basename(inTable))
      cur = arcpy.da.SearchCursor(inTable,[inDesc.oidFieldName],sql)
      minOID = cur.next()[0]
      del cur, sql
      sql = ‘%s = (select max(%s) from %s)’ % (oidName,oidName,os.path.basename(inTable))
      cur = arcpy.da.SearchCursor(inTable,[inDesc.oidFieldName],sql)
      maxOID = cur.next()[0]
      del cur, sql
      breaks = range(minOID,maxOID)[0:-1:2000] #2K slices
      breaks.append(maxOID+1)
      exprList = [oidName + ' >= ' + str(breaks[b]) + ‘ and ‘ +
      oidName + ‘ < ' + str(breaks[b+1]) for b in range(len(breaks)-1)]

      • g3martin says:

        Thanks for posting this.

      • rviger says:

        Yeah, thanks for the code. Just a caveat for others cutting-and-pasting this, I needed to replace the single quotes used in setting the “sql” variables. I don’t think it’s a code error, just something funky in the text representation (not UTF-8?) on either this web site or my fairly standard configuration of Win7.

  2. harrybowman says:

    Why doesn’t this cause a conflict in creating & writing to the in_memoryidentical table? Is there one per process?

  3. amarinelli says:

    Found that their were issues when multiprocessing with data in ArcSDE. The necessary functions would complete (AddField, Calculate Field) but the pool would never join/close to complete the script (i.e. script would hang). Clear Workspace Cache (Data Management) helped.

    # Release hold on ArcSDE workspace created in previous steps.
    arcpy.env.workspace = ""

    # Execute the Clear Workspace Cache tool
    # If you do not specify a connection, all ArcSDE workspaces will be removed from the Cache
    arcpy.ClearWorkspaceCache_management()