Cyber RT API for Developers¶
This document provides an extensive technical deep dive into how to create, manipulate and use Cyber RT’s API.
Table of Contents¶
Talker-Listener¶
The first part of demonstrating CyberRT API is to understand the Talker/Listener example. Following are three essential concepts: node (basic unit), reader(facility to read message) and writer(facility to write message) of the example.
Create a node¶
In the CyberRT framework, the node is the most fundamental unit, similar to the role of a handle
. When creating a specific functional object (writer, reader, etc.), you need to create it based on an existing node instance.
The node creation interface is as follows:
std::unique_ptr<Node> apollo::cyber::CreateNode(const std::string& node_name, const std::string& name_space = "");
Parameters:
node_name: name of the node, globally unique identifier
name_space: name of the space where the node is located
name_space is empty by default. It is the name of the space concatenated with node_name. The format is
/namespace/node_name
Return value - An exclusive smart pointer to Node
Error Conditions - when
cyber::Init()
has not called, the system is in an uninitialized state, unable to create a node, return nullptr
Create a writer¶
The writer is the basic facility used in CyberRT to send messages. Every writer corresponds to a channel with a specific data type.
The writer is created by the CreateWriter
interface in the node class. The interfaces are listed as below:
template <typename MessageT>
auto CreateWriter(const std::string& channel_name)
-> std::shared_ptr<Writer<MessageT>>;
template <typename MessageT>
auto CreateWriter(const proto::RoleAttributes& role_attr)
-> std::shared_ptr<Writer<MessageT>>;
Parameters:
channel_name: the name of the channel to write to
MessageT: The type of message to be written out
Return value - Shared pointer to the Writer object
Create a reader¶
The reader is the basic facility used in cyber to receive messages. Reader has to be bound to a callback function when it is created. When a new message arrives in the channel, the callback will be called.
The reader is created by the CreateReader
interface of the node class. The interfaces are listed as below:
template <typename MessageT>
auto CreateReader(const std::string& channel_name, const std::function<void(const std::shared_ptr<MessageT>&)>& reader_func)
-> std::shared_ptr<Reader<MessageT>>;
template <typename MessageT>
auto CreateReader(const ReaderConfig& config,
const CallbackFunc<MessageT>& reader_func = nullptr)
-> std::shared_ptr<cyber::Reader<MessageT>>;
template <typename MessageT>
auto CreateReader(const proto::RoleAttributes& role_attr,
const CallbackFunc<MessageT>& reader_func = nullptr)
-> std::shared_ptr<cyber::Reader<MessageT>>;
Parameters:
MessageT: The type of message to read
channel_name: the name of the channel to receive from
reader_func: callback function to process the messages
Return value - Shared pointer to the Reader object
Code Example¶
Talker (cyber/examples/talker.cc)¶
#include "cyber/cyber.h"
#include "cyber/proto/chatter.pb.h"
#include "cyber/time/rate.h"
#include "cyber/time/time.h"
using apollo::cyber::Rate;
using apollo::cyber::Time;
using apollo::cyber::proto::Chatter;
int main(int argc, char *argv[]) {
// init cyber framework
apollo::cyber::Init(argv[0]);
// create talker node
std::shared_ptr<apollo::cyber::Node> talker_node(
apollo::cyber::CreateNode("talker"));
// create talker
auto talker = talker_node->CreateWriter<Chatter>("channel/chatter");
Rate rate(1.0);
while (apollo::cyber::OK()) {
static uint64_t seq = 0;
auto msg = std::make_shared<apollo::cyber::proto::Chatter>();
msg->set_timestamp(Time::Now().ToNanosecond());
msg->set_lidar_timestamp(Time::Now().ToNanosecond());
msg->set_seq(seq++);
msg->set_content("Hello, apollo!");
talker->Write(msg);
AINFO << "talker sent a message!";
rate.Sleep();
}
return 0;
}
Listener (cyber/examples/listener.cc)¶
#include "cyber/cyber.h"
#include "cyber/proto/chatter.pb.h"
void MessageCallback(
const std::shared_ptr<apollo::cyber::proto::Chatter>& msg) {
AINFO << "Received message seq-> " << msg->seq();
AINFO << "msgcontent->" << msg->content();
}
int main(int argc, char *argv[]) {
// init cyber framework
apollo::cyber::Init(argv[0]);
// create listener node
auto listener_node = apollo::cyber::CreateNode("listener");
// create listener
auto listener =
listener_node->CreateReader<apollo::cyber::proto::Chatter>(
"channel/chatter", MessageCallback);
apollo::cyber::WaitForShutdown();
return 0;
}
Bazel BUILD file(cyber/samples/BUILD)¶
cc_binary(
name = "talker",
srcs = [ "talker.cc", ],
deps = [
"//cyber",
"//cyber/examples/proto:examples_cc_proto",
],
)
cc_binary(
name = "listener",
srcs = [ "listener.cc", ],
deps = [
"//cyber",
"//cyber/examples/proto:examples_cc_proto",
],
)
Build and Run¶
Build: bazel build cyber/examples/…
Run talker/listener in different terminals:
./bazel-bin/cyber/examples/talker
./bazel-bin/cyber/examples/listener
Examine the results: you should see message printing out on listener.
Service Creation and Use¶
Introduction¶
In an autonomous driving system, there are many scenarios that require more from module communication than just sending or receiving messages. Service is another way of communication between nodes. Unlike channel, service implements two-way
communication, e.g. a node obtains a response by sending a request. This section introduces the service
module in CyberRT API with examples.
Demo - Example¶
Problem: create a client-server model that pass Driver.proto back and forth. When a request is sent in by the client, the server parses/processes the request and returns the response.
The implementation of the demo mainly includes the following steps.
Define request and response messages¶
All messages in cyber are in the protobuf
format. Any protobuf message with serialize/deserialize interfaces can be used as the service request and response message. Driver
in examples.proto is used as service request and response in this example:
// filename: examples.proto
syntax = "proto2";
package apollo.cyber.examples.proto;
message Driver {
optional string content = 1;
optional uint64 msg_id = 2;
optional uint64 timestamp = 3;
};
Create a service and a client¶
// filename: cyber/examples/service.cc
#include "cyber/cyber.h"
#include "cyber/examples/proto/examples.pb.h"
using apollo::cyber::examples::proto::Driver;
int main(int argc, char* argv[]) {
apollo::cyber::Init(argv[0]);
std::shared_ptr<apollo::cyber::Node> node(
apollo::cyber::CreateNode("start_node"));
auto server = node->CreateService<Driver, Driver>(
"test_server", [](const std::shared_ptr<Driver>& request,
std::shared_ptr<Driver>& response) {
AINFO << "server: I am driver server";
static uint64_t id = 0;
++id;
response->set_msg_id(id);
response->set_timestamp(0);
});
auto client = node->CreateClient<Driver, Driver>("test_server");
auto driver_msg = std::make_shared<Driver>();
driver_msg->set_msg_id(0);
driver_msg->set_timestamp(0);
while (apollo::cyber::OK()) {
auto res = client->SendRequest(driver_msg);
if (res != nullptr) {
AINFO << "client: response: " << res->ShortDebugString();
} else {
AINFO << "client: service may not ready.";
}
sleep(1);
}
apollo::cyber::WaitForShutdown();
return 0;
}
Bazel build file¶
cc_binary(
name = "service",
srcs = [ "service.cc", ],
deps = [
"//cyber",
"//cyber/examples/proto:examples_cc_proto",
],
)
Build and run¶
Build service/client: bazel build cyber/examples/…
Run: ./bazel-bin/cyber/examples/service
Examining result: you should see content below in apollo/data/log/service.INFO
I1124 16:36:44.568845 14965 service.cc:30] [service] server: i am driver server
I1124 16:36:44.569031 14949 service.cc:43] [service] client: response: msg_id: 1 timestamp: 0
I1124 16:36:45.569514 14966 service.cc:30] [service] server: i am driver server
I1124 16:36:45.569932 14949 service.cc:43] [service] client: response: msg_id: 2 timestamp: 0
I1124 16:36:46.570627 14967 service.cc:30] [service] server: i am driver server
I1124 16:36:46.571024 14949 service.cc:43] [service] client: response: msg_id: 3 timestamp: 0
I1124 16:36:47.571566 14968 service.cc:30] [service] server: i am driver server
I1124 16:36:47.571962 14949 service.cc:43] [service] client: response: msg_id: 4 timestamp: 0
I1124 16:36:48.572634 14969 service.cc:30] [service] server: i am driver server
I1124 16:36:48.573030 14949 service.cc:43] [service] client: response: msg_id: 5 timestamp: 0
Precautions¶
When registering a service, note that duplicate service names are not allowed
The node name applied when registering the server and client should not be duplicated either
Parameter Service¶
The Parameter Service is used for shared data between nodes, and provides basic operations such as set
, get
, and list
. The Parameter Service is based on the Service
implementation and contains service and client.
Parameter Object¶
Supported Data types¶
All parameters passed through cyber are apollo::cyber::Parameter
objects, the table below lists the 5 supported parameter types.
Parameter type |
C++ data type |
protobuf data type |
---|---|---|
apollo::cyber::proto::ParamType::INT |
int64_t |
int64 |
apollo::cyber::proto::ParamType::DOUBLE |
double |
double |
apollo::cyber::proto::ParamType::BOOL |
bool |
bool |
apollo::cyber::proto::ParamType::STRING |
std::string |
string |
apollo::cyber::proto::ParamType::PROTOBUF |
std::string |
string |
apollo::cyber::proto::ParamType::NOT_SET |
- |
- |
Besides the 5 types above, Parameter also supports interface with protobuf object as incoming parameter. Post performing serialization processes the object and converts it to the STRING type for transfer.
Creating the Parameter Object¶
Supported constructors:
Parameter(); // Name is empty, type is NOT_SET
explicit Parameter(const Parameter& parameter);
explicit Parameter(const std::string& name); // type为NOT_SET
Parameter(const std::string& name, const bool bool_value);
Parameter(const std::string& name, const int int_value);
Parameter(const std::string& name, const int64_t int_value);
Parameter(const std::string& name, const float double_value);
Parameter(const std::string& name, const double double_value);
Parameter(const std::string& name, const std::string& string_value);
Parameter(const std::string& name, const char* string_value);
Parameter(const std::string& name, const std::string& msg_str,
const std::string& full_name, const std::string& proto_desc);
Parameter(const std::string& name, const google::protobuf::Message& msg);
Sample code of using Parameter object:
Parameter a("int", 10);
Parameter b("bool", true);
Parameter c("double", 0.1);
Parameter d("string", "cyber");
Parameter e("string", std::string("cyber"));
// proto message Chatter
Chatter chatter;
Parameter f("chatter", chatter);
std::string msg_str("");
chatter.SerializeToString(&msg_str);
std::string msg_desc("");
ProtobufFactory::GetDescriptorString(chatter, &msg_desc);
Parameter g("chatter", msg_str, Chatter::descriptor()->full_name(), msg_desc);
Interface and Data Reading¶
Interface list:
inline ParamType type() const;
inline std::string TypeName() const;
inline std::string Descriptor() const;
inline const std::string Name() const;
inline bool AsBool() const;
inline int64_t AsInt64() const;
inline double AsDouble() const;
inline const std::string AsString() const;
std::string DebugString() const;
template <typename Type>
typename std::enable_if<std::is_base_of<google::protobuf::Message, Type>::value, Type>::type
value() const;
template <typename Type>
typename std::enable_if<std::is_integral<Type>::value && !std::is_same<Type, bool>::value, Type>::type
value() const;
template <typename Type>
typename std::enable_if<std::is_floating_point<Type>::value, Type>::type
value() const;
template <typename Type>
typename std::enable_if<std::is_convertible<Type, std::string>::value, const std::string&>::type
value() const;
template <typename Type>
typename std::enable_if<std::is_same<Type, bool>::value, bool>::type
value() const;
An example of how to use those interfaces:
Parameter a("int", 10);
a.Name(); // return int
a.Type(); // return apollo::cyber::proto::ParamType::INT
a.TypeName(); // return string: INT
a.DebugString(); // return string: {name: "int", type: "INT", value: 10}
int x = a.AsInt64(); // x = 10
x = a.value<int64_t>(); // x = 10
x = a.AsString(); // Undefined behavior, error log prompt
f.TypeName(); // return string: chatter
auto chatter = f.value<Chatter>();
Parameter Service¶
If a node wants to provide a Parameter Service to other nodes, then you need to create a ParameterService
.
/**
* @brief Construct a new ParameterService object
*
* @param node shared_ptr of the node handler
*/
explicit ParameterService(const std::shared_ptr<Node>& node);
Since all parameters are stored in the parameter service object, the parameters can be manipulated directly in the ParameterService without sending a service request.
Setting parameters:
/**
* @brief Set the Parameter object
*
* @param parameter parameter to be set
*/
void SetParameter(const Parameter& parameter);
Getting parameters:
/**
* @brief Get the Parameter object
*
* @param param_name
* @param parameter the pointer to store
* @return true
* @return false call service fail or timeout
*/
bool GetParameter(const std::string& param_name, Parameter* parameter);
Getting the list of parameters:
/**
* @brief Get all the Parameter objects
*
* @param parameters pointer of vector to store all the parameters
* @return true
* @return false call service fail or timeout
*/
bool ListParameters(std::vector<Parameter>* parameters);
Parameter Client¶
If a node wants to use parameter services of other nodes, you need to create a ParameterClient
.
/**
* @brief Construct a new ParameterClient object
*
* @param node shared_ptr of the node handler
* @param service_node_name node name which provide a param services
*/
ParameterClient(const std::shared_ptr<Node>& node, const std::string& service_node_name);
You could also perform SetParameter
, GetParameter
and ListParameters
mentioned under Parameter Service.
Demo - example¶
#include "cyber/cyber.h"
#include "cyber/parameter/parameter_client.h"
#include "cyber/parameter/parameter_server.h"
using apollo::cyber::Parameter;
using apollo::cyber::ParameterServer;
using apollo::cyber::ParameterClient;
int main(int argc, char** argv) {
apollo::cyber::Init(*argv);
std::shared_ptr<apollo::cyber::Node> node =
apollo::cyber::CreateNode("parameter");
auto param_server = std::make_shared<ParameterServer>(node);
auto param_client = std::make_shared<ParameterClient>(node, "parameter");
param_server->SetParameter(Parameter("int", 1));
Parameter parameter;
param_server->GetParameter("int", ¶meter);
AINFO << "int: " << parameter.AsInt64();
param_client->SetParameter(Parameter("string", "test"));
param_client->GetParameter("string", ¶meter);
AINFO << "string: " << parameter.AsString();
param_client->GetParameter("int", ¶meter);
AINFO << "int: " << parameter.AsInt64();
return 0;
}
Build and run¶
Build: bazel build cyber/examples/…
Run: ./bazel-bin/cyber/examples/paramserver
Log API¶
Log library¶
Cyber log library is built on top of glog. The following header files need to be included:
#include "cyber/common/log.h"
#include "cyber/init.h"
Log configuration¶
Default global config path: cyber/setup.bash
The configs below could be modified by devloper:
export GLOG_log_dir=/apollo/data/log
export GLOG_alsologtostderr=0
export GLOG_colorlogtostderr=1
export GLOG_minloglevel=0
Log initialization¶
Call the Init method at the code entry to initialize the log:
apollo::cyber::cyber::Init(argv[0]) is initialized.
If no macro definition is made in the previous component, the corresponding log is printed to the binary log.
Log output macro¶
Log library is encapsulated in Log printing macros. The related log macros are used as follows:
ADEBUG << "hello cyber.";
AINFO << "hello cyber.";
AWARN << "hello cyber.";
AERROR << "hello cyber.";
AFATAL << "hello cyber.";
Log format¶
The format is <MODULE_NAME>.log.<LOG_LEVEL>.<datetime>.<process_id>
About log files¶
Currently, the only different output behavior from default glog is that different log levels of a module will be written into the same log file.
Building a module based on Component¶
Key concepts¶
1. Component¶
The component is the base class that Cyber RT provides to build application modules. Each specific application module can inherit the Component class and define its own Init
and Proc
functions so that it can be loaded into the Cyber framework.
2. Binary vs Component¶
There are two options to use Cyber RT framework for applications:
Binary based: the application is compiled separately into a binary, which communicates with other cyber modules by creating its own
Reader
andWriter
.Component based: the application is compiled into a Shared Library. By inheriting the Component class and writing the corresponding dag description file, the Cyber RT framework will load and run the application dynamically.
The essential Component interface¶
The component’s
Init()
function is like the main function that does some initialization of the algorithm.Component’s
Proc()
function works like Reader’s callback function that is called by the framework when a message arrives.
Advantages of using Component¶
Component can be loaded into different processes through the launch file, and the deployment is flexible.
Component can change the received channel name by modifying the dag file without recompiling.
Component supports receiving multiple types of data.
Component supports providing multiple fusion strategies.
3. Dag file format¶
An example dag file:
# Define all coms in DAG streaming.
module_config {
module_library : "lib/libperception_component.so"
components {
class_name : "PerceptionComponent"
config {
name : "perception"
readers {
channel: "perception/channel_name"
}
}
}
timer_components {
class_name : "DriverComponent"
config {
name : "driver"
interval : 100
}
}
}
module_library: If you want to load the .so library the root directory is the working directory of cyber (the same directory of
setup.bash
)components & timer_component: Select the base component class type that needs to be loaded.
class_name: the name of the component class to load
name: the loaded class_name as the identifier of the loading example
readers: Data received by the current component, supporting 1-3 channels of data.
Demo - examples¶
Common_component_example(cyber/examples/common_component_example/*)¶
Header definition(common_component_example.h)
#include <memory>
#include "cyber/class_loader/class_loader.h"
#include "cyber/component/component.h"
#include "cyber/examples/proto/examples.pb.h"
using apollo::cyber::examples::proto::Driver;
using apollo::cyber::Component;
using apollo::cyber::ComponentBase;
class Commontestcomponent : public Component<Driver, Driver> {
public:
bool Init() override;
bool Proc(const std::shared_ptr<Driver>& msg0,
const std::shared_ptr<Driver>& msg1) override;
};
CYBER_REGISTER_COMPONENT(Commontestcomponent)
Cpp file implementation(common_component_example.cc)
#include "cyber/examples/common_component_smaple/common_component_example.h"
#include "cyber/class_loader/class_loader.h"
#include "cyber/component/component.h"
bool Commontestcomponent::Init() {
AINFO << "Commontest component init";
return true;
}
bool Commontestcomponent::Proc(const std::shared_ptr<Driver>& msg0,
const std::shared_ptr<Driver>& msg1) {
AINFO << "Start commontest component Proc [" << msg0->msg_id() << "] ["
<< msg1->msg_id() << "]";
return true;
}
Timer_component_example(cyber/examples/timer_component_example/*)¶
Header definition(timer_component_example.h)
#include <memory>
#include "cyber/class_loader/class_loader.h"
#include "cyber/component/component.h"
#include "cyber/component/timer_component.h"
#include "cyber/examples/proto/examples.pb.h"
using apollo::cyber::examples::proto::Driver;
using apollo::cyber::Component;
using apollo::cyber::ComponentBase;
using apollo::cyber::TimerComponent;
using apollo::cyber::Writer;
class TimertestComponent : public TimerComponent {
public:
bool Init() override;
bool Proc() override;
private:
std::shared_ptr<Writer<Driver>> driver_writer_ = nullptr;
};
CYBER_REGISTER_COMPONENT(TimertestComponent)
Cpp file implementation(timer_component_example.cc)
#include "cyber/examples/timer_component_example/timer_component_example.h"
#include "cyber/class_loader/class_loader.h"
#include "cyber/component/component.h"
#include "cyber/examples/proto/examples.pb.h"
bool TimertestComponent::Init() {
driver_writer_ = node_->CreateWriter<Driver>("/carstatus/channel");
return true;
}
bool TimertestComponent::Proc() {
static int i = 0;
auto out_msg = std::make_shared<Driver>();
out_msg->set_msg_id(i++);
driver_writer_->Write(out_msg);
AINFO << "timertestcomponent: Write drivermsg->"
<< out_msg->ShortDebugString();
return true;
}
Build and run¶
Use timertestcomponent as example:
Build: bazel build cyber/examples/timer_component_smaple/…
Run: mainboard -d cyber/examples/timer_component_smaple/timer.dag
Precautions¶
Component needs to be registered to load the class through SharedLibrary. The registration interface looks like:
CYBER_REGISTER_COMPONENT(DriverComponent)
If you use a namespace when registering, you also need to add a namespace when you define it in the dag file.
The configuration files of the Component and TimerComponent are different, please be careful not to mix the two up.
Launch¶
cyber_launch is the launcher of the Cyber RT framework. It starts multiple mainboards according to the launch file, and loads different components into different mainboards according to the dag file. cyber_launch supports two scenarios for dynamically loading components or starting Binary programs in a child process.
Launch File Format¶
<cyber>
<module>
<name>driver</name>
<dag_conf>driver.dag</dag_conf>
<process_name></process_name>
<exception_handler>exit</exception_handler>
</module>
<module>
<name>perception</name>
<dag_conf>perception.dag</dag_conf>
<process_name></process_name>
<exception_handler>respawn</exception_handler>
</module>
<module>
<name>planning</name>
<dag_conf>planning.dag</dag_conf>
<process_name></process_name>
</module>
</cyber>
Module: Each loaded component or binary is a module
name is the loaded module name
dag_conf is the name of the corresponding dag file of the component
process_name is the name of the mainboard process once started, and the same component of process_name will be loaded and run in the same process.
exception_handler is the handler method when the exception occurs in the process. The value can be exit or respawn listed below.
exit, which means that the entire process needs to stop running when the current process exits abnormally.
respawn, the current process needs to be restarted after abnormal exit. Start this process. If there is no such thing as it is empty, it means no treatment. Can be controlled by the user according to the specific conditions of the process
Timer¶
Timer can be used to create a timed task to run on a periodic basis, or to run only once
Timer Interface¶
/**
* @brief Construct a new Timer object
*
* @param period The period of the timer, unit is ms
* @param callback The tasks that the timer needs to perform
* @param oneshot True: perform the callback only after the first timing cycle
* False: perform the callback every timed period
*/
Timer(uint32_t period, std::function<void()> callback, bool oneshot);
Or you could encapsulate the parameters into a timer option as follows:
struct TimerOption {
uint32_t period; // The period of the timer, unit is ms
std::function<void()> callback; // The tasks that the timer needs to perform
bool oneshot; // True: perform the callback only after the first timing cycle
// False: perform the callback every timed period
};
/**
* @brief Construct a new Timer object
*
* @param opt Timer option
*/
explicit Timer(TimerOption opt);
Start Timer¶
After creating a Timer instance, you must call Timer::Start()
to start the timer.
Stop Timer¶
When you need to manually stop a timer that has already started, you can call the Timer::Stop()
interface.
Demo - example¶
#include <iostream>
#include "cyber/cyber.h"
int main(int argc, char** argv) {
cyber::Init(argv[0]);
// Print current time every 100ms
cyber::Timer timer(100, [](){
std::cout << cyber::Time::Now() << std::endl;
}, false);
timer.Start()
sleep(1);
timer.Stop();
}
Time API¶
Time is a class used to manage time; it can be used for current time acquisition, time-consuming calculation, time conversion, and so on.
The time interfaces are as follows:
// constructor, passing in a different value to construct Time
Time(uint64_t nanoseconds); //uint64_t, in nanoseconds
Time(int nanoseconds); // int type, unit: nanoseconds
Time(double seconds); // double, in seconds
Time(uint32_t seconds, uint32_t nanoseconds);
// seconds seconds + nanoseconds nanoseconds
Static Time Now(); // Get the current time
Double ToSecond() const; // convert to seconds
Uint64_t ToNanosecond() const; // Convert to nanoseconds
Std::string ToString() const; // Convert to a string in the format "2018-07-10 20:21:51.123456789"
Bool IsZero() const; // Determine if the time is 0
A code example can be seen below:
#include <iostream>
#include "cyber/cyber.h"
#include "cyber/duration.h"
int main(int argc, char** argv) {
cyber::Init(argv[0]);
Time t1(1531225311123456789UL);
std::cout << t1.ToString() << std::endl; // 2018-07-10 20:21:51.123456789
// Duration time interval
Time t1(100);
Duration d(200);
Time t2(300);
assert(d == (t1-t2)); // true
}
Record file: Read and Write¶
Reading the Reader file¶
RecordReader is the component used to read messages in the cyber framework. Each RecordReader can open an existing record file through the Open
method, and the thread will asynchronously read the information in the record file. The user only needs to execute ReadMessage to extract the latest message in RecordReader, and then get the message information through GetCurrentMessageChannelName, GetCurrentRawMessage, GetCurrentMessageTime.
RecordWriter is the component used to record messages in the cyber framework. Each RecordWriter can create a new record file through the Open method. The user only needs to execute WriteMessage and WriteChannel to write message and channel information, and the writing process is asynchronous.
Demo - example(cyber/examples/record.cc)¶
Write 100 RawMessage toTEST_FILE
through test_write
method, then read them out through test_read
method.
#include <string>
#include "cyber/cyber.h"
#include "cyber/message/raw_message.h"
#include "cyber/proto/record.pb.h"
#include "cyber/record/record_message.h"
#include "cyber/record/record_reader.h"
#include "cyber/record/record_writer.h"
using ::apollo::cyber::record::RecordReader;
using ::apollo::cyber::record::RecordWriter;
using ::apollo::cyber::record::RecordMessage;
using apollo::cyber::message::RawMessage;
const char CHANNEL_NAME_1[] = "/test/channel1";
const char CHANNEL_NAME_2[] = "/test/channel2";
const char MESSAGE_TYPE_1[] = "apollo.cyber.proto.Test";
const char MESSAGE_TYPE_2[] = "apollo.cyber.proto.Channel";
const char PROTO_DESC[] = "1234567890";
const char STR_10B[] = "1234567890";
const char TEST_FILE[] = "test.record";
void test_write(const std::string &writefile) {
RecordWriter writer;
writer.SetSizeOfFileSegmentation(0);
writer.SetIntervalOfFileSegmentation(0);
writer.Open(writefile);
writer.WriteChannel(CHANNEL_NAME_1, MESSAGE_TYPE_1, PROTO_DESC);
for (uint32_t i = 0; i < 100; ++i) {
auto msg = std::make_shared<RawMessage>("abc" + std::to_string(i));
writer.WriteMessage(CHANNEL_NAME_1, msg, 888 + i);
}
writer.Close();
}
void test_read(const std::string &readfile) {
RecordReader reader(readfile);
RecordMessage message;
uint64_t msg_count = reader.GetMessageNumber(CHANNEL_NAME_1);
AINFO << "MSGTYPE: " << reader.GetMessageType(CHANNEL_NAME_1);
AINFO << "MSGDESC: " << reader.GetProtoDesc(CHANNEL_NAME_1);
// read all message
uint64_t i = 0;
uint64_t valid = 0;
for (i = 0; i < msg_count; ++i) {
if (reader.ReadMessage(&message)) {
AINFO << "msg[" << i << "]-> "
<< "channel name: " << message.channel_name
<< "; content: " << message.content
<< "; msg time: " << message.time;
valid++;
} else {
AERROR << "read msg[" << i << "] failed";
}
}
AINFO << "static msg=================";
AINFO << "MSG validmsg:totalcount: " << valid << ":" << msg_count;
}
int main(int argc, char *argv[]) {
apollo::cyber::Init(argv[0]);
test_write(TEST_FILE);
sleep(1);
test_read(TEST_FILE);
return 0;
}
Build and run¶
Build: bazel build cyber/examples/…
Run: ./bazel-bin/cyber/examples/record
Examining result:
I1124 16:56:27.248200 15118 record.cc:64] [record] msg[0]-> channel name: /test/channel1; content: abc0; msg time: 888
I1124 16:56:27.248227 15118 record.cc:64] [record] msg[1]-> channel name: /test/channel1; content: abc1; msg time: 889
I1124 16:56:27.248239 15118 record.cc:64] [record] msg[2]-> channel name: /test/channel1; content: abc2; msg time: 890
I1124 16:56:27.248252 15118 record.cc:64] [record] msg[3]-> channel name: /test/channel1; content: abc3; msg time: 891
I1124 16:56:27.248297 15118 record.cc:64] [record] msg[4]-> channel name: /test/channel1; content: abc4; msg time: 892
I1124 16:56:27.248378 15118 record.cc:64] [record] msg[5]-> channel name: /test/channel1; content: abc5; msg time: 893
...
I1124 16:56:27.250422 15118 record.cc:73] [record] static msg=================
I1124 16:56:27.250434 15118 record.cc:74] [record] MSG validmsg:totalcount: 100:100
API Directory¶
Node API¶
For additional information and examples, refer to Node
API List¶
//create writer with user-define attr and message type
auto CreateWriter(const proto::RoleAttributes& role_attr)
-> std::shared_ptr<transport::Writer<MessageT>>;
//create reader with user-define attr, callback and message type
auto CreateReader(const proto::RoleAttributes& role_attr,
const croutine::CRoutineFunc<MessageT>& reader_func)
-> std::shared_ptr<transport::Reader<MessageT>>;
//create writer with specific channel name and message type
auto CreateWriter(const std::string& channel_name)
-> std::shared_ptr<transport::Writer<MessageT>>;
//create reader with specific channel name, callback and message type
auto CreateReader(const std::string& channel_name,
const croutine::CRoutineFunc<MessageT>& reader_func)
-> std::shared_ptr<transport::Reader<MessageT>>;
//create reader with user-define config, callback and message type
auto CreateReader(const ReaderConfig& config,
const CallbackFunc<MessageT>& reader_func)
-> std::shared_ptr<cybertron::Reader<MessageT>>;
//create service with name and specific callback
auto CreateService(const std::string& service_name,
const typename service::Service<Request, Response>::ServiceCallback& service_calllback)
-> std::shared_ptr<service::Service<Request, Response>>;
//create client with name to send request to server
auto CreateClient(const std::string& service_name)
-> std::shared_ptr<service::Client<Request, Response>>;
Writer API¶
For additional information and examples, refer to Writer
API List¶
bool Write(const std::shared_ptr<MessageT>& message);
Client API¶
For additional information and examples, refer to Client
API List¶
SharedResponse SendRequest(SharedRequest request,
const std::chrono::seconds& timeout_s = std::chrono::seconds(5));
SharedResponse SendRequest(const Request& request,
const std::chrono::seconds& timeout_s = std::chrono::seconds(5));
Parameter API¶
The interface that the user uses to perform parameter related operations:
Set the parameter related API.
Read the parameter related API.
Create a ParameterService to provide parameter service related APIs for other nodes.
Create a ParameterClient that uses the parameters provided by other nodes to service related APIs.
For additional information and examples, refer to Parameter
API List - Setting parameters¶
Parameter(); // Name is empty, type is NOT_SET
explicit Parameter(const Parameter& parameter);
explicit Parameter(const std::string& name); // Type is NOT_SET
Parameter(const std::string& name, const bool bool_value);
Parameter(const std::string& name, const int int_value);
Parameter(const std::string& name, const int64_t int_value);
Parameter(const std::string& name, const float double_value);
Parameter(const std::string& name, const double double_value);
Parameter(const std::string& name, const std::string& string_value);
Parameter(const std::string& name, const char* string_value);
Parameter(const std::string& name, const std::string& msg_str,
const std::string& full_name, const std::string& proto_desc);
Parameter(const std::string& name, const google::protobuf::Message& msg);
API List - Reading parameters¶
inline ParamType type() const;
inline std::string TypeName() const;
inline std::string Descriptor() const;
inline const std::string Name() const;
inline bool AsBool() const;
inline int64_t AsInt64() const;
inline double AsDouble() const;
inline const std::string AsString() const;
std::string DebugString() const;
template <typename Type>
typename std::enable_if<std::is_base_of<google::protobuf::Message, Type>::value, Type>::type
value() const;
template <typename Type>
typename std::enable_if<std::is_integral<Type>::value && !std::is_same<Type, bool>::value, Type>::type
value() const;
template <typename Type>
typename std::enable_if<std::is_floating_point<Type>::value, Type>::type
value() const;
template <typename Type>
typename std::enable_if<std::is_convertible<Type, std::string>::value, const std::string&>::type
value() const;
template <typename Type>
typename std::enable_if<std::is_same<Type, bool>::value, bool>::type
value() const;
API List - Creating parameter service¶
explicit ParameterService(const std::shared_ptr<Node>& node);
void SetParameter(const Parameter& parameter);
bool GetParameter(const std::string& param_name, Parameter* parameter);
bool ListParameters(std::vector<Parameter>* parameters);
API List - Creating parameter client¶
ParameterClient(const std::shared_ptr<Node>& node, const std::string& service_node_name);
bool SetParameter(const Parameter& parameter);
bool GetParameter(const std::string& param_name, Parameter* parameter);
bool ListParameters(std::vector<Parameter>* parameters);
Timer API¶
You can set the parameters of the Timer and call the start and stop interfaces to start the timer and stop the timer. For additional information and examples, refer to Timer
API List¶
Timer(uint32_t period, std::function<void()> callback, bool oneshot);
Timer(TimerOption opt);
void SetTimerOption(TimerOption opt);
void Start();
void Stop();
Time API¶
For additional information and examples, refer to Time
API List¶
static const Time MAX;
static const Time MIN;
Time() {}
explicit Time(uint64_t nanoseconds);
explicit Time(int nanoseconds);
explicit Time(double seconds);
Time(uint32_t seconds, uint32_t nanoseconds);
Time(const Time& other);
static Time Now();
static Time MonoTime();
static void SleepUntil(const Time& time);
double ToSecond() const;
uint64_t ToNanosecond() const;
std::string ToString() const;
bool IsZero() const;
Duration API¶
Interval-related interface, used to indicate the time interval, can be initialized according to the specified nanosecond or second.
API List¶
Duration() {}
Duration(int64_t nanoseconds);
Duration(int nanoseconds);
Duration(double seconds);
Duration(uint32_t seconds, uint32_t nanoseconds);
Duration(const Rate& rate);
Duration(const Duration& other);
double ToSecond() const;
int64_t ToNanosecond() const;
bool IsZero() const;
void Sleep() const;
Rate API¶
The frequency interface is generally used to initialize the time of the sleep frequency after the object is initialized according to the specified frequency.
API List¶
Rate(double frequency);
Rate(uint64_t nanoseconds);
Rate(const Duration&);
void Sleep();
void Reset();
Duration CycleTime() const;
Duration ExpectedCycleTime() const { return expected_cycle_time_; }
RecordReader API¶
The interface for reading the record file is used to read the message and channel information in the record file.
API List¶
RecordReader();
bool Open(const std::string& filename, uint64_t begin_time = 0,
uint64_t end_time = UINT64_MAX);
void Close();
bool ReadMessage();
bool EndOfFile();
const std::string& CurrentMessageChannelName();
std::shared_ptr<RawMessage> CurrentRawMessage();
uint64_t CurrentMessageTime();
RecordWriter API¶
The interface for writing the record file, used to record the message and channel information into the record file.
API List¶
RecordWriter();
bool Open(const std::string& file);
void Close();
bool WriteChannel(const std::string& name, const std::string& type,
const std::string& proto_desc);
template <typename MessageT>
bool WriteMessage(const std::string& channel_name, const MessageT& message,
const uint64_t time_nanosec,
const std::string& proto_desc = "");
bool SetSizeOfFileSegmentation(uint64_t size_kilobytes);
bool SetIntervalOfFileSegmentation(uint64_t time_sec);