2. Cyber RT Python

2.1. 1. Background

The core functions of Cyber RT are developed in C++. We also provide more python interfaces to help developers build their own utilities for specific projects.

2.2. 2. Cyber RT Python Interfaces

The python interfaces of Cyber RT are wrapper the corresponding C++ interfaces. The implementation doesn’t rely on other third-party tools, e.g. swig, which makes it easier to maintain.

2.3. 3. Overview of Python Interfaces in Cyber RT

So far, the python interfaces covers:

  • access the information of channels

  • server/client communication

  • query informatoin in record files

  • read and write from/to record files

  • Time/Duration/Rate related operations

  • Timer

2.3.1. 3.1 Read/Write of Channels

Steps shown as below:

  1. First create a Node;

  2. Create corresponding reader or writer;

  3. If write to a channel, use write interface in writer.

  4. If read from a channel, use the spin interface in the node, and process the messages in your callback function

The interfaces are shown below:

class Node:
    """
    Class for cyber Node wrapper.
    """

    def create_writer(self, name, data_type, qos_depth=1):
        """
        create a topic writer for send message to topic.
        @param self
        @param name str: topic name
        @param data_type proto: message class for serialization
        """

   def create_reader(self, name, data_type, callback, args=None):
        """
        create a topic reader for receive message from topic.
        @param self
        @param name str: topic name
        @param data_type proto: message class for serialization
        @callback fn: function to call (fn(data)) when data is
                   received. If args is set, the function must
                   accept the args as a second argument,
                   i.e. fn(data, args)
        @args any: additional arguments to pass to the callback
        """
	def create_client(self, name, request_data_type, response_data_type):
	"""
	"""
	def create_service(self, name, req_data_type, res_data_type, callback, args=None):

    def spin(self):
        """
        spin in wait and process message.
        @param self
        """

class Writer(object):
    """
    Class for cyber writer wrapper.
    """
    def write(self, data):
        """
        writer msg string
        """

2.3.2. 3.2 Record Interfaces

Read from record:

  1. Create a RecordReader;

  2. Read messages from Record;

Write to record:

  1. Create a RecordWriter

  2. Write messages to record;

The interfaces are shown below:

class RecordReader(object):
    """
    Class for cyber RecordReader wrapper.
    """
    def read_messages(self, start_time=0, end_time=18446744073709551615):
        """
        read message from bag file.
        @param self
        @param start_time:
        @param end_time:
        @return: generator of (message, data_type, timestamp)
        """
	def get_messagenumber(self, channel_name):
        """
        return message count.
        """
	def get_messagetype(self, channel_name):
        """
        return message type.
        """
	def get_protodesc(self, channel_name):
        """
        return message protodesc.
        """
	def get_headerstring(self):
        """
        return message header string.
        """
	def reset(self):
        """
        return reset.
        """
        return _CYBER_RECORD.PyRecordReader_Reset(self.record_reader)

    def get_channellist(self):
        """
        return channel list.
        """
        return _CYBER_RECORD.PyRecordReader_GetChannelList(self.record_reader)


class RecordWriter(object):
    """
    Class for cyber RecordWriter wrapper.
    """
	def open(self, path):
        """
        open record file for write.
        """
	def write_channel(self, channel_name, type_name, proto_desc):
        """
        writer channel by channelname,typename,protodesc
        """
	def write_message(self, channel_name, data, time, raw = True):
        """
        writer msg:channelname,data,time,is data raw
        """

	def set_size_fileseg(self, size_kilobytes):
        """
        return filesegment size.
        """

    def set_intervaltime_fileseg(self, time_sec):
        """
        return file interval time.
        """

    def get_messagenumber(self, channel_name):
        """
        return message count.
        """

    def get_messagetype(self, channel_name):
        """
        return message type.
        """

    def get_protodesc(self, channel_name):
        """
        return message protodesc.
        """

2.3.3. 3.3 Time Interfaces

class Time(object):
	@staticmethod
    def now():
        time_now = Time(_CYBER_TIME.PyTime_now())
        return time_now

    @staticmethod
    def mono_time():
        mono_time = Time(_CYBER_TIME.PyTime_mono_time())
        return mono_time

    def to_sec(self):
        return _CYBER_TIME.PyTime_to_sec(self.time)

    def to_nsec(self):
        return _CYBER_TIME.PyTime_to_nsec(self.time)

    def sleep_until(self, nanoseconds):
        return _CYBER_TIME.PyTime_sleep_until(self.time, nanoseconds)

2.3.4. 3.4 Timer Interfaces

class Timer(object):

    def set_option(self, period, callback, oneshot=0):
        """
        set the option of timer.
        @param period The period of the timer, unit is ms.
        @param callback The tasks that the timer needs to perform.
        @param oneshot 1:perform the callback only after the first timing cycle
        0:perform the callback every timed period
        """

    def start(self):

    def stop(self):

2.4. 4. Examples

2.4.1. 4.1 Read from Channel (in cyber/python/cyber_py3/examples/listener.py)

from cyber_py3 import cyber
from cyber.proto.unit_test_pb2 import ChatterBenchmark


def callback(data):
    """
    Reader message callback.
    """
    print("=" * 80)
    print("py:reader callback msg->:")
    print(data)
    print("=" * 80)


def test_listener_class():
    """
    Reader message.
    """
    print("=" * 120)
    test_node = cyber.Node("listener")
    test_node.create_reader("channel/chatter", ChatterBenchmark, callback)
    test_node.spin()


if __name__ == '__main__':
    cyber.init()
    test_listener_class()
    cyber.shutdown()

2.4.2. 4.2 Write to Channel(in cyber/python/cyber_py3/examples/talker.py)

import time

from cyber_py3 import cyber
from cyber.proto.unit_test_pb2 import ChatterBenchmark


def test_talker_class():
   """
   Test talker.
   """
   msg = ChatterBenchmark()
   msg.content = "py:talker:send Alex!"
   msg.stamp = 9999
   msg.seq = 0
   print(msg)
   test_node = cyber.Node("node_name1")
   g_count = 1

   writer = test_node.create_writer("channel/chatter", ChatterBenchmark, 6)
   while not cyber.is_shutdown():
       time.sleep(1)
       g_count = g_count + 1
       msg.seq = g_count
       msg.content = "I am python talker."
       print("=" * 80)
       print("write msg -> %s" % msg)
       writer.write(msg)


if __name__ == '__main__':
   cyber.init("talker_sample")
   test_talker_class()
   cyber.shutdown()

2.4.3. 4.3 Read and Write Messages from/to Record File(in cyber/python/cyber_py3/examples/record.py)

"""
Module for example of record.
Run with:
    bazel run //cyber/python/cyber_py3/examples:record
"""

import time

from google.protobuf.descriptor_pb2 import FileDescriptorProto

from cyber.proto.unit_test_pb2 import Chatter
from cyber.python.cyber_py3 import record
from modules.common.util.testdata.simple_pb2 import SimpleMessage


MSG_TYPE = "apollo.common.util.test.SimpleMessage"
MSG_TYPE_CHATTER = "apollo.cyber.proto.Chatter"


def test_record_writer(writer_path):
    """
    Record writer.
    """
    fwriter = record.RecordWriter()
    fwriter.set_size_fileseg(0)
    fwriter.set_intervaltime_fileseg(0)

    if not fwriter.open(writer_path):
        print('Failed to open record writer!')
        return
    print('+++ Begin to writer +++')

    # Writer 2 SimpleMessage
    msg = SimpleMessage()
    msg.text = "AAAAAA"

    file_desc = msg.DESCRIPTOR.file
    proto = FileDescriptorProto()
    file_desc.CopyToProto(proto)
    proto.name = file_desc.name
    desc_str = proto.SerializeToString()
    print(msg.DESCRIPTOR.full_name)
    fwriter.write_channel(
        'simplemsg_channel', msg.DESCRIPTOR.full_name, desc_str)
    fwriter.write_message('simplemsg_channel', msg, 990, False)
    fwriter.write_message('simplemsg_channel', msg.SerializeToString(), 991)

    # Writer 2 Chatter
    msg = Chatter()
    msg.timestamp = 99999
    msg.lidar_timestamp = 100
    msg.seq = 1

    file_desc = msg.DESCRIPTOR.file
    proto = FileDescriptorProto()
    file_desc.CopyToProto(proto)
    proto.name = file_desc.name
    desc_str = proto.SerializeToString()
    print(msg.DESCRIPTOR.full_name)
    fwriter.write_channel('chatter_a', msg.DESCRIPTOR.full_name, desc_str)
    fwriter.write_message('chatter_a', msg, 992, False)
    msg.seq = 2
    fwriter.write_message("chatter_a", msg.SerializeToString(), 993)

    fwriter.close()


def test_record_reader(reader_path):
    """
    Record reader.
    """
    freader = record.RecordReader(reader_path)
    time.sleep(1)
    print('+' * 80)
    print('+++ Begin to read +++')
    count = 0
    for channel_name, msg, datatype, timestamp in freader.read_messages():
        count += 1
        print('=' * 80)
        print('read [%d] messages' % count)
        print('channel_name -> %s' % channel_name)
        print('msgtime -> %d' % timestamp)
        print('msgnum -> %d' % freader.get_messagenumber(channel_name))
        print('msgtype -> %s' % datatype)
        print('message is -> %s' % msg)
        print('***After parse(if needed),the message is ->')
        if datatype == MSG_TYPE:
            msg_new = SimpleMessage()
            msg_new.ParseFromString(msg)
            print(msg_new)
        elif datatype == MSG_TYPE_CHATTER:
            msg_new = Chatter()
            msg_new.ParseFromString(msg)
            print(msg_new)


if __name__ == '__main__':
    test_record_file = "/tmp/test_writer.record"

    print('Begin to write record file: {}'.format(test_record_file))
    test_record_writer(test_record_file)

    print('Begin to read record file: {}'.format(test_record_file))
    test_record_reader(test_record_file)