ROS1 rosbag的详细使用并且使用python合并bag包的方法

寻技术 Python编程 2024年01月14日 83

在使用ros的时候经常会用到rosbag来录制或者回放算法,是个非常有用的工具。

rosbag 命令列表

命令 作用
record 录制一个包,并且指定topic
info 总结一个包的详细信息
play 回放一个或者多个包,并且可以指定topic
check 确定一个包是否可以在当前系统中播放,或者是否可以迁移
fix 修复一个包使得它能在当前系统中播放
filter 通过python脚本转换包内容
compress 压缩包
decompress 解压缩包
reindex 重新索引一个或者多个损坏的包

record 命令使用

record <topic-names> # 指定一个或者多个topic来录制

$ rosbag record body_status gnss_imu

record -a, --all # 录制所有topic

$ rosbag record -a

record -e, --regex # 录制使用正则表达式匹配的topic

$ rosbag record -e "/(.*)_log/point"

rocord -p, -publish # (Melodic 新特性)当开始录制包时,发布一个topic=”begin_write“

$ rosbag record -p

record -x, --exclude # 录制时排除正则表达式匹配的topic

$ rosbag record -e "/(.*)_log/point" -x "(.*)/point"

record -q, --quiet # 录制时抑制指定topic的log输出

$ rosbag record -q /body_status

record -d, --duration # 指定topic录制最大时常

$ rosbag record -d 30 /body_status
$ rosbag record -d 3m /body_status
$ rosbag record -d 3h /body_status

record -o, --output-prefix # 输出包名前加上前缀

$ rosbag record -o output /topic_name /topic_name2 

record -O, --output-name # 指定输出包名

$ rosbag record -O output.bag /topic_name /topic_name2 

record --split # 每隔设定的时常或者大小分割包录制

$ rosbag record --split --size=1024 /topic_name
$ rosbag record --split --duration=30 /topic_name
$ rosbag record --split --duration=5m /topic_name
$ rosbag record --split --duration=2h /topic_name

record --max-splits # (Kinetic 新特性)分割包的最大个数

$ rosbag record --split --max-splits 3 --duration=30 /topic_name

record -b, --buffsize # 使用内部缓冲区(默认值:256MB,无限为0)

$ rosbag record -b 1024 /topic_name

record --chunksize # 开辟缓冲区,比buffsize更为先进(默认值: 768KB,无限为0)

$ rosbag record --chunksize=1024 /topic_name

record -l, --limit # 在topic中只录制指定数量的消息

$ rosbag record -l 1000 /topic_name

record --node # 记录指定节点订阅的所有消息

$ rosbag record --node=/node

record -j, --bz2 # 设置录制以bz2格式压缩

$ rosbag record -j /topic_name

record --lz4 # 设置录制以lz4格式压缩

$ rosbag record --lz4 /topic_name
  • record --tcpnodelay # 订阅主题时使用TCP_NODELAY传输 参考资料
  • record --udp # 订阅主题时使用UDP传输

info 指令使用

info <bag-files> # 查看一个或者多个包的详细信息

$ rosbag info test.bag

info -y, --yaml # 以yaml格式输出一个或者多个包的详细信息

$ rosbag info -y test.bag

info -k, --key # 查看以yaml格式输出,包内key值对应的数据

$ rosbag info -y -k topics test.bag

play 指令使用

play <bag_name> # 播放一个或者多个数据包

$ rosbag play test.bag

play -p, --prefix # 为所有输出主题添加前缀

$ rosbag play -p test test.bag

play -q, --quiet # 抑制控制台输出

$ rosbag play -q test.bag

play -i, --immediate # 播放包不等待

$ rosbag play -i test.bag

play --pause # 开始播放包时暂停

$ rosbag play --pause test.bag

play --queue # 播放包时以设置的队列大小播放 默认值为100

$ rosbag play --queue=1000 test.bag

play --clock # 自动发布一个clocktopic

$ rosbag play --clock test.bag

play --hz # 以指定赫兹发布clock topic

$ rosbag play --hz=1 --clock test.bag

play -d, --delay # 指定延迟播放的时间

$ rosbag -d 5 test.bag

play -r, --rate # 指定播放速度

$ rosbag -r 10 test.bag

play -s, --start # 指定开始播放时间

$ rosbag play -s 5 test.bag 

play -u,--duration # 设定播放时间

$ rosbag play -u 240 test.bag

play --skip-empty # 设置跳过没有消息的数据的时间

$ rosbag play --skip-empty=1 test.bag 

play -l, --loop # 循环播放

$ rosbag play -l test.bag

play -k, --keep-alive # 播放时保持活跃状态

$ rosbag play -k test.bag

play --try-future-version # 即使不知道版本号也可以打开bag

$ rosbag play --try-future-version test.bag

play --topics # 指定对应topics进行播放

$ rosbag play test.bag --topics /topic1 /topic2

play --pause-topics # 当回放数据时,暂停指定topics

$ rosbag play test.bag --topics /topic1 /topic2 --pause-topics

play --bags # 指定多个bags进行回放

$ rosbag play --bags=test.bag 

play --wait-for-subscribers # 再发布之前,等待每个主题至少有一个订阅者play --rate-control-topic= # 观看给定的主题,如果上一次发布时间超过了<速率控制最大延迟>,请等待该主题再次发布后再继续播放play --rate-control-max-delay= # 暂停前与<速率控制主题>的最大时间差 check 指令使用 check <file> # 确定一个包在当前系统中是否可以播放。

$ rosbag check test.bag

check -g, --genrules # 生成一个名为RULEFILE的迁移规则文件

$ rosbag check -g diagnostics.bmr test.bag

check -a, --append # 加载后附加到现有数据迁移规则文件的末尾。

$ rosbag check -a -g diagnostics.bmr test.bag

check -n, --noplugins # 不加载规则文件

$ rosbag check -n test.bag

fix 指令使用

fix <in-bag> <out-bag> [rules.bmr] # 使用规则文件修复数据包

$ rosbag fix old.bag repaired.bag myrules.bmr

fix -n, --noplugins # 修复数据包不带规则文件

$ rosbag fix -n old.bag repaired.bag

filter 指令使用

filter <in-bag> <out-bag> <expression> # 使用给定的Python表达式转换数据文件。

$ rosbag filter my.bag only-tf.bag "topic == '/tf'"

compress 指令使用

compress <bag-files> # 使用BZ2格式压缩包

$ rosbag compress *.bag

compress --output-dir=DIR # 指定路径输出文件

$ rosbag compress *.bag --output-dir=compressed

compress -f, --force # 如果已经存在包,则强制覆盖

$ rosbag compress -f *.bag

compress -q, --quiet # 抑制非关键信息

$ rosbag compress -q *.bag

compress -j, --bz2 # 使用bz2格式压缩包

$ rosbag compress -j *.bag

compress --lz4 # 使用lz4格式压缩包

$ rosbag compress --lz4 *.bag

decompress 指令使用 decompress <bag-files> # 解压缩包

$ rosbag decompress *.bag

decompress --output-dir=DIR # 指定解压缩的路径

$ rosbag decompress --output-dir=uncompressed *.bag

decompress -f, --force # 如果已经存在包,则强制覆盖

$ rosbag decompress -f *.bag

decompress -q, --quiet # 抑制非关键信息

$ rosbag decompress -q *.bag

reindex 指令使用 reindex <bag-files> # 修复坏包

$ rosbag reindex *.bag

reindex --output-dir=DIR # 指定路径

$ rosbag reindex --output-dir=reindexed *.bag

reindex -f, --force # 如果已经存在包,则强制覆盖

$ rosbag reindex -f *.bag

reindex -q, --quiet # 抑制非关键信息

$ rosbag reindex -q *.bag

2. 使用Python合并包代码

#! /usr/bin/env python3
import os
import time
import argparse
from rosbag import Bag, Compression
def parse_compression(compression):
    if compression == "none" or compression == "NONE":
        compression = Compression.NONE
    elif compression == "bz2":
        compression = Compression.BZ2
    elif compression == "lz4":
        compression = Compression.LZ4
    return compression
def get_files_list(arg_input, arg_select):
    files = None
    if " " in arg_input:
        files = arg_input.split(" ")
    elif "," in arg_input:
        files = arg_input.split(",")
    elif os.path.exists(args.input) or "*" in args.input:
        rfind_index = arg_input.rfind("/")
        dir_path = arg_input[:rfind_index]
        files = os.listdir(dir_path)
        files.sort() # 名称排序
        files = [dir_path + "/" + file for file in files]
    if arg_select:
        files = select_bags(files)
    print("bags list:")
    for i in range(len(files)):
        print(str(i) + "." + files[i])
    return files
def select_bags(files):
    for i in range(len(files)):
        print(str(i) + "." + files[i])
    print("Please input bag numbers to merge. split by , or space", end=":")
    s_b = input()
    s_b_list = []
    if "," in s_b:
        s_b_list = s_b.split(",")
    elif " " in s_b:
        s_b_list = s_b.split(" ")
    files = [files[int(x)] for x in s_b_list]
    return files
def show_process_bar(total, i, start):
    a = "*" * i
    b = "." * (total - i)
    c = (i / total) * 100
    dur = time.perf_counter() - start
    print("\r{:^3.0f}%[{}->{}]{:.2f}s".format(c,a,b,dur), end="")
def merge_bags(args):
    print("start merge bags.")
    compression = parse_compression(args.compression)
    print(f"bag's compression mode is {compression}.")
    files = get_files_list(args.input, args.select)
    with Bag(args.output, "w",  compression=compression) as o:
        start = time.perf_counter()
        for i in range(len(files)):
            show_process_bar(len(files), i+1, start)
            with Bag(files[i], "r") as ib:
                for topic, msg, t in ib:
                    o.write(topic, msg, t)
            show_process_bar(len(files), i+1, start)
if __name__ == "__main__":
    parser = argparse.ArgumentParser(description="Merge one or more bag files to one file.")
    parser.add_argument("-o", "--output", help="Output bag's file path.",
                        default="output.bag", required=True)
    parser.add_argument("-i", "--input", help="Input files bags name or path, split by , .",
                        required=True)
    parser.add_argument("-c", "--compression", help="Compress the bag by bz2 or lz4",
                        default="lz4", choices=["none", "lz4", "bz2"])
    parser.add_argument("-s", "--select", help="Select bags to merge.",
                        default=False)
    parser.add_argument("-v", "--verbose", help="Show the verbose msg.")
    args = parser.parse_args()
    merge_bags(args)        

Reference:

http://wiki.ros.org/rosbag/Commandline

原文地址:https://blog.csdn.net/xiaokai1999/article/details/130504577
关闭

用微信“扫一扫”