# -*- coding: utf-8 -*-
#
# Copyright 2016 - 2018 Ternaris.
# SPDX-License-Identifier: AGPL-3.0-only
from __future__ import absolute_import, division, print_function
import re
import sys
from collections import defaultdict, namedtuple
from itertools import groupby
from logging import getLogger
import capnp
import genpy
import rosbag
from rosbag.bag import _get_message_type
import marv
import marv_nodes
from marv.scanner import DatasetInfo
from .bag_capnp import Bagmeta, Header, Message
# Regular expression used to aggregate individual bags into sets (see
# below). Bags with the same *name* and consecutive *idx* starting
# with 0 are put into sets. If one bag is missing all remaining bags
# will result in sets with one bag each.
REGEX = re.compile(r"""
^
(?P<basename>
(?P<name>
.+?
)
(?:
_(?P<timestamp>
\d{4}(?:-\d{2}){5}
)
)?
(?:
_(?P<idx>
\d+
)
)?
)
.bag$
""", re.VERBOSE)
_Baginfo = namedtuple('Baginfo', 'filename basename name timestamp idx')
class Baginfo(_Baginfo):
def __new__(cls, filename, basename, name, timestamp=None, idx=None):
idx = None if idx is None else int(idx)
return super(Baginfo, cls).__new__(cls, filename, basename,
name, timestamp, idx)
[docs]def scan(dirpath, dirnames, filenames):
"""Default scanner for ROS bag files
Bags suffixed with a consecutive index are grouped into sets::
foo_0.bag
foo_1.bag
foo_3.bag
foo_4.bag
results in::
foo [foo_0.bag, foo_1.bag]
foo_3 [foo_3.bag]
foo_4 [foo_4.bag]
In this example the bag with index 2 is missing which results in
foo_3 and foo_4 to be individual sets with one bag each.
The timestamps used by ``rosbag record`` are stripped from the
name given to sets, but are kept for the remaining individual sets
in case a bag is missing::
foo_2018-01-12-14-05-12_0.bag
foo_2018-01-12-14-45-23_1.bag
foo_2018-01-12-14-55-42_3.bag
results in::
foo [foo_2018-01-12-14-05-12_0.bag,
foo_2018-01-12-14-45-23_1.bag]
foo_2018-01-12-14-45-23_1 [foo_2018-01-12-14-45-23_1.bag]
foo_2018-01-12-14-55-42_3 [foo_2018-01-12-14-55-42_3.bag]
For more information on scanners see :any:`marv.scanner`.
Args:
dirpath (str): The path to the directory currently being
scanned.
dirnames (str): Sorted list of subdirectories of the directory
currently being scanned. Change this in-place to control
further traversal.
filenames (str): Sorted list of files within the directory
currently being scanned.
Returns:
A list of :class:`marv.scanner.DatasetInfo` instances mapping
set of files to dataset names. Absolute filenames must
start with :paramref:`.dirpath`, relative filenames are
automatically prefixed with it.
See :ref:`cfg_c_scanner` config key.
"""
groups = groupby([Baginfo(x, **re.match(REGEX, x).groupdict())
for x in reversed(filenames)
if x.endswith('.bag')],
lambda x: x.name)
bags = []
datasets = []
for name, group in groups:
group = list(group)
prev_idx = None
for bag in group:
expected_idx = bag.idx if prev_idx is None else prev_idx - 1
if bag.idx != expected_idx or \
bags and (bags[0].timestamp is None) != (bag.timestamp is None):
datasets[0:0] = [DatasetInfo(x.basename, [x.filename]) for x in bags]
bags[:] = []
bags.insert(0, bag)
prev_idx = bag.idx
if bag.idx == 0:
datasets.insert(0, DatasetInfo(name, [x.filename for x in bags]))
bags[:] = []
elif bag.idx is None:
assert len(bags) == 1, bags
datasets.insert(0, DatasetInfo(bag.basename, [bag.filename]))
bags[:] = []
datasets[0:0] = [DatasetInfo(x.basename, [x.filename]) for x in bags]
bags[:] = []
return datasets
def read_messages(paths, topics=None, start_time=None, end_time=None):
"""Iterate chronologically raw BagMessage for topic from paths."""
bags = {path: rosbag.Bag(path) for path in paths}
gens = {path: bag.read_messages(topics=topics, start_time=start_time,
end_time=end_time, raw=True)
for path, bag in bags.items()}
msgs = {}
prev_timestamp = genpy.Time(0)
while True:
for key in (gens.viewkeys() - msgs.viewkeys()):
try:
msgs[key] = gens[key].next()
except StopIteration:
bags[key].close()
del bags[key]
del gens[key]
if not msgs:
break
next_key = reduce(lambda x, y: x if x[1].timestamp < y[1].timestamp else y,
msgs.items())[0]
next_msg = msgs.pop(next_key)
assert next_msg.timestamp >= prev_timestamp
yield next_msg
prev_timestamp = next_msg.timestamp
[docs]@marv.node(Message, Header, group='ondemand')
@marv.input('dataset', marv_nodes.dataset)
@marv.input('bagmeta', bagmeta)
def raw_messages(dataset, bagmeta):
"""Stream messages from a set of bag files."""
bagmeta, dataset = yield marv.pull_all(bagmeta, dataset)
bagtopics = bagmeta.topics
connections = bagmeta.connections
paths = [x.path for x in dataset.files if x.path.endswith('.bag')]
requested = yield marv.get_requested()
log = yield marv.get_logger()
alltopics = set()
bytopic = defaultdict(list)
groups = {}
for name in [x.name for x in requested]:
if ':' in name:
reqtop, reqtype = name.split(':')
# BUG: topic with more than one type is not supported
topics = [con.topic for con in connections
if ((reqtop == '*' or reqtop == con.topic) and
(reqtype == '*' or reqtype == con.datatype))]
group = groups[name] = yield marv.create_group(name)
create_stream = group.create_stream
else:
topics = [name] if name in bagtopics else []
group = None
create_stream = marv.create_stream
for topic in topics:
# BUG: topic with more than one type is not supported
con = next(x for x in connections if x.topic == topic)
# TODO: start/end_time per topic?
header = {'start_time': bagmeta.start_time,
'end_time': bagmeta.end_time,
'msg_count': con.msg_count,
'msg_type': con.datatype,
'msg_type_def': con.msg_def,
'msg_type_md5sum': con.md5sum,
'topic': topic}
stream = yield create_stream(topic, **header)
bytopic[topic].append(stream)
alltopics.update(topics)
if group:
yield group.finish()
if not alltopics:
return
# BUG: topic with more than one type is not supported
for topic, raw, t in read_messages(paths, topics=list(alltopics)):
dct = {'data': raw[1], 'timestamp': t.to_nsec()}
for stream in bytopic[topic]:
yield stream.msg(dct)
messages = raw_messages
_ConnectionInfo = namedtuple('_ConnectionInfo', 'md5sum datatype msg_def')
[docs]def get_message_type(stream):
"""ROS message type from definition stored for stream."""
assert stream.msg_type
assert stream.msg_type_def
assert stream.msg_type_md5sum
info = _ConnectionInfo(md5sum=stream.msg_type_md5sum,
datatype=stream.msg_type,
msg_def=stream.msg_type_def)
return _get_message_type(info)