Skip to content
Snippets Groups Projects
Commit 317a7b12 authored by Younes Oubkis's avatar Younes Oubkis
Browse files

launch files

parent 693189b3
No related branches found
No related tags found
No related merge requests found
<launch>
<node name="manager" pkg="arbotix_aruco" type="manager.py" output="screen" ></node>
<node name="queueman" pkg="arbotix_aruco" type="queueman.py" output="screen" ></node>
</launch>
<launch>
<arg name="image_view" default="false" />
<node name="usb_cam" pkg="usb_cam" type="usb_cam_node" output="screen" >
<rosparam command="load" file="$(find usb_cam)/config/usb_cam.yml"/>
<param name="video_device" value="/dev/video4"/>
</node>
<node if="$(arg image_view)" name="image_view" pkg="image_view" type="image_view"
respawn="false" output="screen">
<remap from="image" to="/usb_cam/image_raw"/>
<param name="autosize" value="true" />
</node>
<node name="aruco_tracker" pkg="nodelet" type="nodelet"
args="standalone aruco_opencv/ArucoTracker" output="screen">
<rosparam command="load"
file="$(find aruco_opencv)/config/aruco_tracker.yaml" />
<param name="board_descriptions_path"
value="$(find aruco_opencv)/config/board_descriptions.yaml" />
</node>
</launch>
<?xml version="1.0"?>
<package format="2">
<name>arbotix_aruco</name>
<version>0.0.1</version>
<description>The aruco_manager package</description>
<maintainer email="aylin.aydin@stud.hs-bochum.de">Aylin Aydin</maintainer>
<maintainer email="younes.oubkis@stud.hs-bochum.de">Younes Oubkis</maintainer>
<!-- One license tag required, multiple allowed, one license per tag -->
<!-- Commonly used license strings: -->
<!-- BSD, MIT, Boost Software License, GPLv2, GPLv3, LGPLv2.1, LGPLv3 -->
<license>TODO</license>
<buildtool_depend>catkin</buildtool_depend>
<build_depend>rospy</build_depend>
<build_export_depend>rospy</build_export_depend>
<exec_depend>rospy</exec_depend>
<!-- The export tag contains other, unspecified, tags -->
<export>
<!-- Other tools can request additional information be placed here -->
</export>
</package>
#!/usr/bin/env python
import rospy
from sensor_msgs.msg import JointState
from std_msgs.msg import String, Header # Assuming the /aruco_detection topic publishes String messages
from aruco_opencv_msgs.msg import ArucoDetection, MarkerPose
from geometry_msgs.msg import Pose,Point, Quaternion
class marker_manager():
def __init__(self) -> None:
self.calibrated = False
self.calib_count = 0
self.markers = dict()
rospy.init_node('aruco_mapper_node', anonymous=True)
rospy.Subscriber('/aruco_detections', ArucoDetection, self.aruco_callback)
self.mapper = rospy.Publisher('/aruco_mapped_detections',JointState)
self.top_right_id = 395
self.bot_left_id = 163
rospy.loginfo("Waiting for Calibration...")
def aruco_callback(self,msg):
#rospy.loginfo("Received ArUco Detection: %s", msg.markers)
if(len(msg.markers) < 1):
return
for marker in msg.markers:
if(marker.marker_id == 0 or marker.marker_id == 1023):
continue
pose = marker.pose
if(marker.marker_id not in self.markers.keys()):
self.markers[marker.marker_id] = marker.pose
rospy.loginfo(self.markers.keys())
#rospy.loginfo(f'New Marker detected: Id: {marker.marker_id}\nPos:{pose.position}\nRot:{pose.orientation}')
#self.send_new_markers()
else:
self.markers[marker.marker_id] = pose
pass
k = self.markers.keys()
if(self.bot_left_id in k and self.top_right_id in k and (not self.calibrated or self.calib_count < 140) ):
self.calibrate()
def start(self):
while not rospy.is_shutdown():
if not self.calibrated:
rospy.sleep(1)
continue
pop_keys = []
for key in self.markers:
m = self.map(self.markers[key].position)
x,y = m
if(x > 500 or y > 500 or x < -500 or y < -500):
pop_keys.append(key)
continue
#self.mapper.publish()
js = JointState()
js.header = Header()
js.header.stamp = rospy.Time.now()
js.name = [str(key)]
js.position = [x,y]
self.mapper.publish(js)
for p in pop_keys:
self.markers.pop(p)
pop_keys.clear()
rospy.sleep(1)
def calibrate(self):
rospy.loginfo(f"Calibrating: {round(100.0 * self.calib_count/140.0,1)}%")
self.top_right = self.markers[self.top_right_id].position
self.bot_left = self.markers[self.bot_left_id].position
self.center_x = (self.top_right.x-self.bot_left.x)/2
self.center_z = (self.top_right.z-self.bot_left.z)/2
#rospy.loginfo(f"Center Position: {self.center_x},{self.center_z}")
self.calibration = self.cal((self.bot_left.x,self.bot_left.z),(self.top_right.x,self.top_right.z),(130,-180),(-130,180))
self.calib_count += 1
if(self.calib_count > 140):
self.calibrated = True
def cal(self,measured_point1,measured_point2,mapped_point1,mapped_point2):
x1, y1 = measured_point1
x2, y2 = measured_point2
x_mapped1, y_mapped1 = mapped_point1
x_mapped2, y_mapped2 = mapped_point2
# Calculate the slope and intercept for the linear transformation
if x2 - x1 != 0:
slope_x = (x_mapped2 - x_mapped1) / (x2 - x1)
else:
slope_x = float('inf')
if y2 - y1 != 0:
slope_y = (y_mapped2 - y_mapped1) / (y2 - y1)
else:
slope_y = float('inf')
intercept_x = x_mapped1 - slope_x * x1
intercept_y = y_mapped1 - slope_y * y1
return (slope_x, intercept_x, slope_y, intercept_y)
def map(self,measured_point):
x, y = measured_point.x,measured_point.z
slope_x, intercept_x, slope_y, intercept_y = self.calibration
mapped_x = slope_x * x + intercept_x
mapped_y = slope_y * y + intercept_y
return (mapped_x, mapped_y)
if __name__ == '__main__':
try:
man = marker_manager()
man.start()
except rospy.ROSInterruptException:
pass
#!/usr/bin/env python
import rospy
from sensor_msgs.msg import JointState
from std_msgs.msg import String, Header, Float64 # Assuming the /aruco_detection topic publishes String messages
from aruco_opencv_msgs.msg import ArucoDetection, MarkerPose
from geometry_msgs.msg import Pose,Point, Quaternion
import numpy as np
class queue_manager():
def __init__(self):
self.markers = dict()
rospy.init_node('aruco_arbotix_node', anonymous=True)
self.queue = []
self.finished = []
self.current = None
self.ready = True
rospy.Subscriber("/aruco_mapped_detections",JointState,self.marker_cb)
rospy.Subscriber("/joint_states",JointState,self.robot_state_cb)
self.publisher = rospy.Publisher("/phantomx_reactor_controller/set_position_inv",JointState,queue_size=10)
self.reset_pub = rospy.Publisher("/phantomx_reactor_controller/reset",Float64,queue_size=10)
def marker_cb(self,msg):
if(msg.name[0] not in self.markers.keys()):
self.markers[msg.name[0]] = msg.position
self.queue.append(msg.position)
rospy.loginfo(self.queue)
def robot_state_cb(self,msg):
vel = np.array(msg.velocity)
v = np.linalg.norm(vel)
rospy.loginfo(v)
if(v > 0.5):
rospy.loginfo("in_movement")
self.ready = False
else:
if(not self.ready):
self.reset_pub.publish(0)
rospy.sleep(5)
self.ready = True
rospy.loginfo("idle")
def publish_robo_pos(self,pos):
pos = list(pos)
pos.append(15)
js = JointState()
js.header = Header()
js.header.stamp = rospy.Time.now()
js.name = ["Mapped Pos"]
js.position = pos
self.publisher.publish(js)
def start(self):
while not rospy.is_shutdown():
rospy.sleep(1)
if self.ready:
if(len(self.queue) > 0):
if self.current is not None:
self.finished.append(self.current)
self.current = self.queue.pop()
self.ready = False
self.publish_robo_pos(self.current)
else:
rospy.loginfo("queue empty")
if __name__ == '__main__':
try:
man = queue_manager()
man.start()
except rospy.ROSInterruptException:
pass
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment