Building a Simple PCL Interface for Python¶
In this exercise, we will fill in the appropriate pieces of code to build a perception pipeline. The end goal will be to create point cloud filtering operations to demonstrate functionality between ROS and python.
Prepare New Workspace:¶
We will create a new catkin workspace, since this exercise does not overlap with the previous ScanNPlan exercises.
Disable automatic sourcing of your previous catkin workspace:
gedit ~/.bashrcComment out the line of your
.bashrcfile which sources the previous workspacesource /opt/ros/melodic/setup.bash # source ~/catkin_ws/devel/setup.bash
Copy the template workspace layout and files:
cd $HOME git clone https://github.com/cardboardcode/industrial_training.git --single-branch --branch melodic --depth 1 cp -r ~/industrial_training/exercises/python-pcl_ws ~ cd ~/python-pcl_ws/
Initialize and Build this new workspace
catkin init catkin build
Source the workspace
source ~/python-pcl_ws/devel/setup.bashDownload the PointCloud file and place the file in your workspace’s src directory :
cp -r ~/industrial_training/exercises/4.2/table.pcd src/
Intro (Review Existing Code)¶
Most of the infrastructure for a ROS node has already been completed for you; the focus of this exercise is the perception algorithms/pipeline. The CMakelists.txt and package.xml are complete and a source file has been provided. At this time we will explore the source code that has been provided in the py_perception_node.cpp file. This tutorial has been modified from training Exercise 5.1 Building a Perception Pipeline and as such the C++ code has already been set up. Open the perception_node.cpp file and review the filtering functions.
Create a Python Package¶
Now that we have converted several filters to C++ functions, we are ready to call it from a Python node.
In the terminal, change the directory to your src folder. Create a new package called filter_call inside your python-pcl_ws:
cd ~/python-pcl_ws/src/ catkin create pkg filter_call --catkin-deps rospyCheck that your package was created:
ls
Open
CMakeLists.txtin the filter_call package. Uncomment line 21 or wherever you find # catkin_python_setup() and save.gedit filter_call/CMakeLists.txt
Creating setup.py¶
The setup.py file makes your python module available to the entire workspace and subsequent packages. By default, this isn’t created by the catkin_create_pkg command.
In your terminal type
touch filter_call/setup.py gedit filter_call/setup.py
Copy and paste the following to the
setup.pyfile## ! DO NOT MANUALLY INVOKE THIS setup.py, USE CATKIN INSTEAD from distutils.core import setup from catkin_pkg.python_setup import generate_distutils_setup # fetch values from package.xml setup_args = generate_distutils_setup( packages=['filter_call'], package_dir={'': 'include'}, ) setup(**setup_args)
Change
packages = [ . . . ],to your list of strings of the name of the folders inside your include folder. By convention, this will be the same name as the package, orfilter_call. The configuresfilter_call/include/filter_callas a python module available to the whole workspace.Save and close the file.
In order for this folder to be accessed by any other python script, the
__init__.pyfile must exist.Create one in the terminal by typing:
mkdir -p filter_call/include/filter_call touch filter_call/include/filter_call/__init__.py
Publishing the Point Cloud¶
As iterated before, we are creating a ROS C++ node to filter the point cloud when requested by a Python node running a service request for each filtering operation, resulting in a new, aggregated point cloud. Let’s start with modifying our C++ code to publish in a manner supportive to python. Remember, the C++ code is already done so all you need to do is write your python script and view the results in Rviz.
Implement a Voxel Filter¶
In
py_perception_node.cpp, take notice of the function calledfilterCallBack(around line 170). This function will be the entry point for all service calls made by the Python client in order to run point cloud filtering operations.bool filterCallback(py_perception::FilterCloud::Request& request, py_perception::FilterCloud::Response& response) { pcl::PointCloud<pcl::PointXYZ>::Ptr cloud (new pcl::PointCloud<pcl::PointXYZ>); pcl::PointCloud<pcl::PointXYZ>::Ptr filtered_cloud (new pcl::PointCloud<pcl::PointXYZ>); if (request.pcdfilename.empty()) { pcl::fromROSMsg(request.input_cloud, *cloud); ROS_INFO_STREAM("cloud size: " << cloud->size()); } else { pcl::io::loadPCDFile(request.pcdfilename, *cloud); } if (cloud->empty()) { ROS_ERROR("input cloud empty"); response.success = false; return false; } switch (request.operation) { case py_perception::FilterCloud::Request::VOXELGRID : { filtered_cloud = voxelGrid(cloud, 0.01); break; } default : { ROS_ERROR("No valid request found"); return false; } } /* * SETUP RESPONSE */ pcl::toROSMsg(*filtered_cloud, response.output_cloud); response.output_cloud.header=request.input_cloud.header; response.output_cloud.header.frame_id="kinect_link"; response.success = true; return true; }
Within
main, take notice of the lines starting at line 244, this is where we load the parameters used by the various filters.priv_nh_.param<double>("leaf_size", leaf_size_, 0.0f);
Build the package and go into the filter_call package now
Now that we have the framework for the filtering, open your terminal. Make sure you are in the
filter_calldirectory. Create ascriptsfolder.cd filter_call mkdir scripts touch scripts/filter_call.py gedit scripts/filter_call.pyCopy and paste the following code at the top of
filter_call.pyto import necessary libraries:#!/usr/bin/env python import rospy import py_perception.srv from sensor_msgs.msg import PointCloud2
We will create an
ifstatement that contains themainfunction that is called when the node is run from the command line. Paste the following after the import statements:if __name__ == '__main__': try: rospy.init_node('filter_cloud', anonymous=True) rospy.wait_for_service('filter_cloud') rospy.spin() except Exception as e: print("Service call failed: %s" % str(e))
The
rospy.init_nodefunction initializes the node and gives it a nameThe
rospy.wait_for_servicewaits for thefilter_cloudservice.The
rospy.spinis the Python counterpart of theroscpp::spin()function in C++.
Call the service to apply a Voxel Grid filter. Copy and paste the following inside the
tryblock in the line following therospy.wait_for_servicefunction:# ======================= # VOXEL GRID FILTER # ======================= srvp = rospy.ServiceProxy('filter_cloud', py_perception.srv.FilterCloud) req = py_perception.srv.FilterCloudRequest() req.pcdfilename = rospy.get_param('~pcdfilename', '') req.operation = py_perception.srv.FilterCloudRequest.VOXELGRID # FROM THE SERVICE, ASSIGN POINTS req.input_cloud = PointCloud2() # ERROR HANDLING if req.pcdfilename == '': raise Exception('No file parameter found') # PACKAGE THE FILTERED POINTCLOUD2 TO BE PUBLISHED res_voxel = srvp(req) print('response received') if not res_voxel.success: raise Exception('Unsuccessful voxel grid filter operation') # PUBLISH VOXEL FILTERED POINTCLOUD2 pub = rospy.Publisher('/perception_voxelGrid', PointCloud2, queue_size=1, latch=True) pub.publish(res_voxel.output_cloud) print("published: voxel grid filter response")
The python script,
filter_call.py, should now look like this:#!/usr/bin/env python import rospy import py_perception.srv from sensor_msgs.msg import PointCloud2 if __name__ == '__main__': try: rospy.init_node('filter_cloud', anonymous=True) rospy.wait_for_service('filter_cloud') # ======================= # VOXEL GRID FILTER # ======================= srvp = rospy.ServiceProxy('filter_cloud', py_perception.srv.FilterCloud) req = py_perception.srv.FilterCloudRequest() req.pcdfilename = rospy.get_param('~pcdfilename', '') req.operation = py_perception.srv.FilterCloudRequest.VOXELGRID # FROM THE SERVICE, ASSIGN POINTS req.input_cloud = PointCloud2() # ERROR HANDLING if req.pcdfilename == '': raise Exception('No file parameter found') # PACKAGE THE FILTERED POINTCLOUD2 TO BE PUBLISHED res_voxel = srvp(req) print('response received') if not res_voxel.success: raise Exception('Unsuccessful voxel grid filter operation') # PUBLISH VOXEL FILTERED POINTCLOUD2 pub = rospy.Publisher('/perception_voxelGrid', PointCloud2, queue_size=1, latch=True) pub.publish(res_voxel.output_cloud) print("published: voxel grid filter response") rospy.spin() except Exception as e: print("Service call failed: %s" % str(e))
Make the Python file executable. In your terminal:
sudo chmod +x scripts/filter_call.py
Viewing Results¶
Open a new terminal, run
roscore
Open a new terminal and run the C++ filter service node
cd ~/python-pcl_ws source devel/setup.bash rosrun py_perception py_perception_node
Open a new terminal and run the Python service client node. Note your file path may be different.
cd ~/python-pcl_ws source devel/setup.bash rosrun filter_call filter_call.py _pcdfilename:=$PWD/src/table.pcd
Open a new terminal and run the
tf2_rospackage to publish a static coordinate transform from the child frame to the world framesource /opt/ros/melodic/setup.bash rosrun tf2_ros static_transform_publisher 0 0 0 0 0 0 world_frame kinect_link
Open a new terminal and run Rviz
source /opt/ros/melodic/setup.bash rviz -d ~/python-pcl_ws/src/py_perception/rviz/exercise_simplepclinterface.rvizAdd a new PointCloud2 in Rviz
In global options, change the fixed frame to kinect_link or world_frame, and in the PointCloud 2, select your topic to be ‘/perception_voxelGrid’
Note
You may need to uncheck and recheck the PointCloud2.
Implement Pass-Through Filters¶
In
py_perception_node.cppin thepy_perceptionpackage, update the switch on line 191 to look as shown below:switch (request.operation) { case py_perception::FilterCloud::Request::VOXELGRID : { filtered_cloud = voxelGrid(cloud, 0.01); break; } case py_perception::FilterCloud::Request::PASSTHROUGH : { filtered_cloud = passThrough(cloud); break; } default : { ROS_ERROR("No valid request found"); return false; } }
Save your file. Build
python-pcl_wsworkspace by running the following commands.cd ~/python-pcl_ws catkin buildOpen the python node,
filter_call.py, and copy paste the following code after the voxel grid, before therospy.spin(). Keep care to maintain indents:cd $HOME gedit ~/python-pcl_ws/src/filter_call/scripts/filter_call.py
# ======================= # PASSTHROUGH FILTER # ======================= srvp = rospy.ServiceProxy('filter_cloud', py_perception.srv.FilterCloud) req = py_perception.srv.FilterCloudRequest() req.pcdfilename = '' req.operation = py_perception.srv.FilterCloudRequest.PASSTHROUGH # FROM THE SERVICE, ASSIGN POINTS req.input_cloud = res_voxel.output_cloud # PACKAGE THE FILTERED POINTCLOUD2 TO BE PUBLISHED res_pass = srvp(req) print('response received') if not res_voxel.success: raise Exception('Unsuccessful pass through filter operation') # PUBLISH PASSTHROUGH FILTERED POINTCLOUD2 pub = rospy.Publisher('/perception_passThrough', PointCloud2, queue_size=1, latch=True) pub.publish(res_pass.output_cloud) print("published: pass through filter response")
Save and run from the terminal, repeating steps outlined for the voxel filter.
Within Rviz, compare PointCloud2 displays based on the
/kinect/depth_registered/points(original camera data) andperception_passThrough(latest processing step) topics. Part of the original point cloud has been “clipped” out of the latest processing result.When you are satisfied with the pass-through filter results, press
Ctrl+Cto kill the node. There is no need to close or kill the other terminals/nodes.
Plane Segmentation¶
This method is one of the most useful for any application where the object is on a flat surface. In order to isolate the objects on a table, you perform a plane fit to the points, which finds the points which comprise the table, and then subtract those points so that you are left with only points corresponding to the object(s) above the table. This is the most complicated PCL method we will be using and it is actually a combination of two: the RANSAC segmentation model, and the extract indices tool. An in depth example can be found on the PCL Plane Model Segmentation Tutorial; otherwise you can copy the below code snippet.
In
py_perception_node.cpp, update the switch statement infilterCallbackon line 191 to look as shown below:switch (request.operation) { case py_perception::FilterCloud::Request::VOXELGRID : { filtered_cloud = voxelGrid(cloud, 0.01); break; } case py_perception::FilterCloud::Request::PASSTHROUGH : { filtered_cloud = passThrough(cloud); break; } case py_perception::FilterCloud::Request::PLANESEGMENTATION : { filtered_cloud = planeSegmentation(cloud); break; } default : { ROS_ERROR("No valid request found"); return false; } }
Save your file. Build
python-pcl_wsworkspace by running the following commands.cd ~/python-pcl_ws catkin buildCopy paste the following code in
filter_call.py, after the passthrough filter section. Keep care to maintain indents:# ======================= # PLANE SEGMENTATION # ======================= srvp = rospy.ServiceProxy('filter_cloud', py_perception.srv.FilterCloud) req = py_perception.srv.FilterCloudRequest() req.pcdfilename = '' req.operation = py_perception.srv.FilterCloudRequest.PLANESEGMENTATION # FROM THE SERVICE, ASSIGN POINTS req.input_cloud = res_pass.output_cloud # PACKAGE THE FILTERED POINTCLOUD2 TO BE PUBLISHED res_seg = srvp(req) print('response received') if not res_voxel.success: raise Exception('Unsuccessful plane segmentation operation') # PUBLISH PLANESEGMENTATION FILTERED POINTCLOUD2 pub = rospy.Publisher('/perception_planeSegmentation', PointCloud2, queue_size=1, latch=True) pub.publish(res_seg.output_cloud) print("published: plane segmentation filter response")
Save and run from the terminal, repeating steps outlined for the voxel filter.
Within Rviz, compare PointCloud2 displays based on the
/kinect/depth_registered/points(original camera data) andperception_planeSegmentation(latest processing step) topics. Only points lying above the table plane remain in the latest processing result.When you are done viewing the results you can go back and change the
setMaxIterationsandsetDistanceThresholdparameter values to control how tightly the plane-fit classifies data as inliers/outliers, and view the results again. Try using values ofmaxIterations=100anddistThreshold=0.010When you are satisfied with the plane segmentation results, use
Ctrl+Cto kill the node. There is no need to close or kill the other terminals/nodes.
Euclidian Cluster Extraction¶
This method is useful for any application where there are multiple objects. This is also a complicated PCL method. An in depth example can be found on the PCL Euclidean Cluster Extraction Tutorial.
In
py_perception_node.cpp, update the switch statement infilterCallbackon line 191 to look as shown below:switch (request.operation) { case py_perception::FilterCloud::Request::VOXELGRID : { filtered_cloud = voxelGrid(cloud, 0.01); break; } case py_perception::FilterCloud::Request::PASSTHROUGH : { filtered_cloud = passThrough(cloud); break; } case py_perception::FilterCloud::Request::PLANESEGMENTATION : { filtered_cloud = planeSegmentation(cloud); break; } case py_perception::FilterCloud::Request::CLUSTEREXTRACTION : { std::vector<pcl::PointCloud<pcl::PointXYZ>::Ptr> temp =clusterExtraction(cloud); if (temp.size()>0) { filtered_cloud = temp[0]; } break; } default : { ROS_ERROR("No valid request found"); return false; } }
Save your file. Build
python-pcl_wsworkspace by running the following commands.cd ~/python-pcl_ws catkin buildCopy paste the following code in
filter_call.pyafter the plane segmentation section. Keep care to maintain indents:# ======================= # CLUSTER EXTRACTION # ======================= srvp = rospy.ServiceProxy('filter_cloud', py_perception.srv.FilterCloud) req = py_perception.srv.FilterCloudRequest() req.pcdfilename = '' req.operation = py_perception.srv.FilterCloudRequest.CLUSTEREXTRACTION # FROM THE SERVICE, ASSIGN POINTS req.input_cloud = res_seg.output_cloud # PACKAGE THE FILTERED POINTCLOUD2 TO BE PUBLISHED res_cluster = srvp(req) print('response received') if not res_voxel.success: raise Exception('Unsuccessful cluster extraction operation') # PUBLISH CLUSTEREXTRACTION FILTERED POINTCLOUD2 pub = rospy.Publisher('/perception_clusterExtraction', PointCloud2, queue_size=1, latch=True) pub.publish(res_cluster.output_cloud) print("published: cluster extraction filter response")
The python script,
filter_call.py, should now look like this:#!/usr/bin/env python import rospy import py_perception.srv from sensor_msgs.msg import PointCloud2 if __name__ == '__main__': try: rospy.init_node('filter_cloud', anonymous=True) rospy.wait_for_service('filter_cloud') # ======================= # VOXEL GRID FILTER # ======================= srvp = rospy.ServiceProxy('filter_cloud', py_perception.srv.FilterCloud) req = py_perception.srv.FilterCloudRequest() req.pcdfilename = rospy.get_param('~pcdfilename', '') req.operation = py_perception.srv.FilterCloudRequest.VOXELGRID # FROM THE SERVICE, ASSIGN POINTS req.input_cloud = PointCloud2() # ERROR HANDLING if req.pcdfilename == '': raise Exception('No file parameter found') # PACKAGE THE FILTERED POINTCLOUD2 TO BE PUBLISHED res_voxel = srvp(req) print('response received') if not res_voxel.success: raise Exception('Unsuccessful voxel grid filter operation') # PUBLISH VOXEL FILTERED POINTCLOUD2 pub = rospy.Publisher('/perception_voxelGrid', PointCloud2, queue_size=1, latch=True) pub.publish(res_voxel.output_cloud) print("published: voxel grid filter response") # ======================= # PASSTHROUGH FILTER # ======================= srvp = rospy.ServiceProxy('filter_cloud', py_perception.srv.FilterCloud) req = py_perception.srv.FilterCloudRequest() req.pcdfilename = '' req.operation = py_perception.srv.FilterCloudRequest.PASSTHROUGH # FROM THE SERVICE, ASSIGN POINTS req.input_cloud = res_voxel.output_cloud # PACKAGE THE FILTERED POINTCLOUD2 TO BE PUBLISHED res_pass = srvp(req) print('response received') if not res_voxel.success: raise Exception('Unsuccessful pass through filter operation') # PUBLISH PASSTHROUGH FILTERED POINTCLOUD2 pub = rospy.Publisher('/perception_passThrough', PointCloud2, queue_size=1, latch=True) pub.publish(res_pass.output_cloud) print("published: pass through filter response") # ======================= # PASSTHROUGH FILTER # ======================= srvp = rospy.ServiceProxy('filter_cloud', py_perception.srv.FilterCloud) req = py_perception.srv.FilterCloudRequest() req.pcdfilename = '' req.operation = py_perception.srv.FilterCloudRequest.PASSTHROUGH # FROM THE SERVICE, ASSIGN POINTS req.input_cloud = res_voxel.output_cloud # PACKAGE THE FILTERED POINTCLOUD2 TO BE PUBLISHED res_pass = srvp(req) print('response received') if not res_voxel.success: raise Exception('Unsuccessful pass through filter operation') # PUBLISH PASSTHROUGH FILTERED POINTCLOUD2 pub = rospy.Publisher('/perception_passThrough', PointCloud2, queue_size=1, latch=True) pub.publish(res_pass.output_cloud) print("published: pass through filter response") # ======================= # CLUSTER EXTRACTION # ======================= srvp = rospy.ServiceProxy('filter_cloud', py_perception.srv.FilterCloud) req = py_perception.srv.FilterCloudRequest() req.pcdfilename = '' req.operation = py_perception.srv.FilterCloudRequest.CLUSTEREXTRACTION # FROM THE SERVICE, ASSIGN POINTS req.input_cloud = res_seg.output_cloud # PACKAGE THE FILTERED POINTCLOUD2 TO BE PUBLISHED res_cluster = srvp(req) print('response received') if not res_voxel.success: raise Exception('Unsuccessful cluster extraction operation') # PUBLISH CLUSTEREXTRACTION FILTERED POINTCLOUD2 pub = rospy.Publisher('/perception_clusterExtraction', PointCloud2, queue_size=1, latch=True) pub.publish(res_cluster.output_cloud) print("published: cluster extraction filter response") rospy.spin() except Exception as e: print("Service call failed: %s" % str(e))
Save and run from the terminal, repeating steps outlined for the voxel filter.
When you are satisfied with the cluster extraction results, use
Ctrl+Cto kill the node. If you are done experimenting with this tutorial, you can kill the nodes running in the other terminals.
Future Study¶
The student is encouraged to convert Exercise 5.1 into callable functions and further refine the filtering operations.
Furthermore, for simplicity, the python code was repeated for each filtering instance. The student is encouraged to create a loop to handle the publishing instead of repeating large chunks of code. The student can also leverage the full functionality of the parameter handling instead of just using defaults, can set those from python. There are several more filtering operations not outlined here, if the student wants practice creating those function calls.