2023-01-12 01:04:47 +00:00
|
|
|
"""
|
|
|
|
Mapsforge map file parser (for version 3 files).
|
|
|
|
|
|
|
|
Author: Oliver Gerlich
|
|
|
|
|
|
|
|
References:
|
|
|
|
- http://code.google.com/p/mapsforge/wiki/SpecificationBinaryMapFile
|
|
|
|
- http://mapsforge.org/
|
|
|
|
"""
|
|
|
|
|
|
|
|
from hachoir.parser import Parser
|
|
|
|
from hachoir.field import (Bit, Bits, UInt8, UInt16, UInt32, Int32, UInt64, String,
|
2023-10-07 23:04:41 +00:00
|
|
|
PaddingBits,
|
|
|
|
Enum, Field, FieldSet, SeekableFieldSet, RootSeekableFieldSet)
|
2023-01-12 01:04:47 +00:00
|
|
|
from hachoir.core.endian import BIG_ENDIAN
|
|
|
|
|
|
|
|
|
|
|
|
# micro-degrees factor:
|
|
|
|
UDEG = float(1000 * 1000)
|
|
|
|
|
|
|
|
|
|
|
|
CoordinateEncoding = {
|
|
|
|
0: "single delta encoding",
|
|
|
|
1: "double delta encoding",
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
class UIntVbe(Field):
|
|
|
|
|
|
|
|
def __init__(self, parent, name, description=None):
|
|
|
|
Field.__init__(self, parent, name, description=description)
|
|
|
|
|
|
|
|
value = 0
|
|
|
|
size = 0
|
|
|
|
while True:
|
|
|
|
byteValue = self._parent.stream.readBytes(
|
|
|
|
self.absolute_address + (size * 8), 1)[0]
|
|
|
|
|
|
|
|
haveMoreData = (byteValue & 0x80)
|
|
|
|
value = value | ((byteValue & 0x7f) << (size * 7))
|
|
|
|
size += 1
|
|
|
|
assert size < 100, "UIntVBE is too large"
|
|
|
|
|
2023-10-07 23:04:41 +00:00
|
|
|
if not haveMoreData:
|
2023-01-12 01:04:47 +00:00
|
|
|
break
|
|
|
|
|
|
|
|
self._size = size * 8
|
|
|
|
self.createValue = lambda: value
|
|
|
|
|
|
|
|
|
|
|
|
class IntVbe(Field):
|
|
|
|
|
|
|
|
def __init__(self, parent, name, description=None):
|
|
|
|
Field.__init__(self, parent, name, description=description)
|
|
|
|
|
|
|
|
value = 0
|
|
|
|
size = 0
|
|
|
|
shift = 0
|
|
|
|
while True:
|
|
|
|
byteValue = self._parent.stream.readBytes(
|
|
|
|
self.absolute_address + (size * 8), 1)[0]
|
|
|
|
|
|
|
|
haveMoreData = (byteValue & 0x80)
|
|
|
|
if size == 0:
|
|
|
|
isNegative = (byteValue & 0x40)
|
|
|
|
value = (byteValue & 0x3f)
|
|
|
|
shift += 6
|
|
|
|
else:
|
|
|
|
value = value | ((byteValue & 0x7f) << shift)
|
|
|
|
shift += 7
|
|
|
|
size += 1
|
|
|
|
assert size < 100, "IntVBE is too large"
|
|
|
|
|
2023-10-07 23:04:41 +00:00
|
|
|
if not haveMoreData:
|
2023-01-12 01:04:47 +00:00
|
|
|
break
|
|
|
|
|
|
|
|
if isNegative:
|
|
|
|
value *= -1
|
|
|
|
|
|
|
|
self._size = size * 8
|
|
|
|
self.createValue = lambda: value
|
|
|
|
|
|
|
|
|
|
|
|
class VbeString(FieldSet):
|
|
|
|
|
|
|
|
def createFields(self):
|
|
|
|
yield UIntVbe(self, "length")
|
|
|
|
yield String(self, "chars", self["length"].value, charset="UTF-8")
|
|
|
|
|
|
|
|
def createDescription(self):
|
|
|
|
return '(%d B) "%s"' % (self["length"].value, self["chars"].value)
|
|
|
|
|
|
|
|
|
|
|
|
class TagStringList(FieldSet):
|
|
|
|
|
|
|
|
def createFields(self):
|
|
|
|
yield UInt16(self, "num_tags")
|
|
|
|
for i in range(self["num_tags"].value):
|
|
|
|
yield VbeString(self, "tag[]")
|
|
|
|
|
|
|
|
def createDescription(self):
|
|
|
|
return "%d tag strings" % self["num_tags"].value
|
|
|
|
|
|
|
|
|
|
|
|
class ZoomIntervalCfg(FieldSet):
|
|
|
|
|
|
|
|
def createFields(self):
|
|
|
|
yield UInt8(self, "base_zoom_level")
|
|
|
|
yield UInt8(self, "min_zoom_level")
|
|
|
|
yield UInt8(self, "max_zoom_level")
|
|
|
|
yield UInt64(self, "subfile_start")
|
|
|
|
yield UInt64(self, "subfile_size")
|
|
|
|
|
|
|
|
def createDescription(self):
|
|
|
|
return "zoom level around %d (%d - %d)" % (self["base_zoom_level"].value,
|
|
|
|
self["min_zoom_level"].value, self["max_zoom_level"].value)
|
|
|
|
|
|
|
|
|
|
|
|
class TileIndexEntry(FieldSet):
|
|
|
|
|
|
|
|
def createFields(self):
|
|
|
|
yield Bit(self, "is_water_tile")
|
|
|
|
yield Bits(self, "offset", 39)
|
|
|
|
|
|
|
|
|
|
|
|
class TileZoomTable(FieldSet):
|
|
|
|
|
|
|
|
def createFields(self):
|
|
|
|
yield UIntVbe(self, "num_pois")
|
|
|
|
yield UIntVbe(self, "num_ways")
|
|
|
|
|
|
|
|
def createDescription(self):
|
|
|
|
return "%d POIs, %d ways" % (self["num_pois"].value, self["num_ways"].value)
|
|
|
|
|
|
|
|
|
|
|
|
class TileHeader(FieldSet):
|
|
|
|
|
|
|
|
def __init__(self, parent, name, zoomIntervalCfg, **kw):
|
|
|
|
FieldSet.__init__(self, parent, name, **kw)
|
|
|
|
self.zoomIntervalCfg = zoomIntervalCfg
|
|
|
|
|
|
|
|
def createFields(self):
|
|
|
|
numLevels = int(self.zoomIntervalCfg[
|
|
|
|
"max_zoom_level"].value - self.zoomIntervalCfg["min_zoom_level"].value) + 1
|
2023-10-07 23:04:41 +00:00
|
|
|
assert (numLevels < 50)
|
2023-01-12 01:04:47 +00:00
|
|
|
for i in range(numLevels):
|
|
|
|
yield TileZoomTable(self, "zoom_table_entry[]")
|
|
|
|
yield UIntVbe(self, "first_way_offset")
|
|
|
|
|
|
|
|
|
|
|
|
class POIData(FieldSet):
|
|
|
|
|
|
|
|
def createFields(self):
|
|
|
|
if self["/have_debug"].value:
|
|
|
|
yield String(self, "signature", 32)
|
|
|
|
if not self['signature'].value.startswith("***POIStart"):
|
|
|
|
raise ValueError
|
|
|
|
|
|
|
|
yield IntVbe(self, "lat_diff")
|
|
|
|
yield IntVbe(self, "lon_diff")
|
|
|
|
yield Bits(self, "layer", 4)
|
|
|
|
yield Bits(self, "num_tags", 4)
|
|
|
|
|
|
|
|
for i in range(self["num_tags"].value):
|
|
|
|
yield UIntVbe(self, "tag_id[]")
|
|
|
|
|
|
|
|
yield Bit(self, "have_name")
|
|
|
|
yield Bit(self, "have_house_number")
|
|
|
|
yield Bit(self, "have_ele")
|
|
|
|
yield PaddingBits(self, "pad[]", 5)
|
|
|
|
|
|
|
|
if self["have_name"].value:
|
|
|
|
yield VbeString(self, "name")
|
|
|
|
if self["have_house_number"].value:
|
|
|
|
yield VbeString(self, "house_number")
|
|
|
|
if self["have_ele"].value:
|
|
|
|
yield IntVbe(self, "ele")
|
|
|
|
|
|
|
|
def createDescription(self):
|
|
|
|
s = "POI"
|
|
|
|
if self["have_name"].value:
|
|
|
|
s += ' "%s"' % self["name"]["chars"].value
|
|
|
|
s += " @ %f/%f" % (self["lat_diff"].value / UDEG,
|
|
|
|
self["lon_diff"].value / UDEG)
|
|
|
|
return s
|
|
|
|
|
|
|
|
|
|
|
|
class SubTileBitmap(FieldSet):
|
|
|
|
static_size = 2 * 8
|
|
|
|
|
|
|
|
def createFields(self):
|
|
|
|
for y in range(4):
|
|
|
|
for x in range(4):
|
|
|
|
yield Bit(self, "is_used[%d,%d]" % (x, y))
|
|
|
|
|
|
|
|
|
|
|
|
class WayProperties(FieldSet):
|
|
|
|
|
|
|
|
def createFields(self):
|
|
|
|
if self["/have_debug"].value:
|
|
|
|
yield String(self, "signature", 32)
|
|
|
|
if not self['signature'].value.startswith("---WayStart"):
|
|
|
|
raise ValueError
|
|
|
|
|
|
|
|
yield UIntVbe(self, "way_data_size")
|
|
|
|
|
|
|
|
# WayProperties is split into an outer and an inner field, to allow
|
|
|
|
# specifying data size for inner part:
|
|
|
|
yield WayPropertiesInner(self, "inner", size=self["way_data_size"].value * 8)
|
|
|
|
|
|
|
|
|
|
|
|
class WayPropertiesInner(FieldSet):
|
|
|
|
|
|
|
|
def createFields(self):
|
|
|
|
yield SubTileBitmap(self, "sub_tile_bitmap")
|
|
|
|
# yield Bits(self, "sub_tile_bitmap", 16)
|
|
|
|
|
|
|
|
yield Bits(self, "layer", 4)
|
|
|
|
yield Bits(self, "num_tags", 4)
|
|
|
|
|
|
|
|
for i in range(self["num_tags"].value):
|
|
|
|
yield UIntVbe(self, "tag_id[]")
|
|
|
|
|
|
|
|
yield Bit(self, "have_name")
|
|
|
|
yield Bit(self, "have_house_number")
|
|
|
|
yield Bit(self, "have_ref")
|
|
|
|
yield Bit(self, "have_label_position")
|
|
|
|
yield Bit(self, "have_num_way_blocks")
|
|
|
|
yield Enum(Bit(self, "coord_encoding"), CoordinateEncoding)
|
|
|
|
yield PaddingBits(self, "pad[]", 2)
|
|
|
|
|
|
|
|
if self["have_name"].value:
|
|
|
|
yield VbeString(self, "name")
|
|
|
|
if self["have_house_number"].value:
|
|
|
|
yield VbeString(self, "house_number")
|
|
|
|
if self["have_ref"].value:
|
|
|
|
yield VbeString(self, "ref")
|
|
|
|
if self["have_label_position"].value:
|
|
|
|
yield IntVbe(self, "label_lat_diff")
|
|
|
|
yield IntVbe(self, "label_lon_diff")
|
|
|
|
numWayDataBlocks = 1
|
|
|
|
if self["have_num_way_blocks"].value:
|
|
|
|
yield UIntVbe(self, "num_way_blocks")
|
|
|
|
numWayDataBlocks = self["num_way_blocks"].value
|
|
|
|
|
|
|
|
for i in range(numWayDataBlocks):
|
|
|
|
yield WayData(self, "way_data[]")
|
|
|
|
|
|
|
|
def createDescription(self):
|
|
|
|
s = "way"
|
|
|
|
if self["have_name"].value:
|
|
|
|
s += ' "%s"' % self["name"]["chars"].value
|
|
|
|
return s
|
|
|
|
|
|
|
|
|
|
|
|
class WayData(FieldSet):
|
|
|
|
|
|
|
|
def createFields(self):
|
|
|
|
yield UIntVbe(self, "num_coord_blocks")
|
|
|
|
for i in range(self["num_coord_blocks"].value):
|
|
|
|
yield WayCoordBlock(self, "way_coord_block[]")
|
|
|
|
|
|
|
|
|
|
|
|
class WayCoordBlock(FieldSet):
|
|
|
|
|
|
|
|
def createFields(self):
|
|
|
|
yield UIntVbe(self, "num_way_nodes")
|
|
|
|
yield IntVbe(self, "first_lat_diff")
|
|
|
|
yield IntVbe(self, "first_lon_diff")
|
|
|
|
|
|
|
|
for i in range(self["num_way_nodes"].value - 1):
|
|
|
|
yield IntVbe(self, "lat_diff[]")
|
|
|
|
yield IntVbe(self, "lon_diff[]")
|
|
|
|
|
|
|
|
|
|
|
|
class TileData(FieldSet):
|
|
|
|
|
|
|
|
def __init__(self, parent, name, zoomIntervalCfg, **kw):
|
|
|
|
FieldSet.__init__(self, parent, name, **kw)
|
|
|
|
self.zoomIntervalCfg = zoomIntervalCfg
|
|
|
|
|
|
|
|
def createFields(self):
|
|
|
|
if self["/have_debug"].value:
|
|
|
|
yield String(self, "signature", 32)
|
|
|
|
if not self['signature'].value.startswith("###TileStart"):
|
|
|
|
raise ValueError
|
|
|
|
|
|
|
|
yield TileHeader(self, "tile_header", self.zoomIntervalCfg)
|
|
|
|
|
|
|
|
numLevels = int(self.zoomIntervalCfg[
|
|
|
|
"max_zoom_level"].value - self.zoomIntervalCfg["min_zoom_level"].value) + 1
|
|
|
|
for zoomLevel in range(numLevels):
|
|
|
|
zoomTableEntry = self["tile_header"][
|
|
|
|
"zoom_table_entry[%d]" % zoomLevel]
|
|
|
|
for poiIndex in range(zoomTableEntry["num_pois"].value):
|
|
|
|
yield POIData(self, "poi_data[%d,%d]" % (zoomLevel, poiIndex))
|
|
|
|
|
|
|
|
for zoomLevel in range(numLevels):
|
|
|
|
zoomTableEntry = self["tile_header"][
|
|
|
|
"zoom_table_entry[%d]" % zoomLevel]
|
|
|
|
for wayIndex in range(zoomTableEntry["num_ways"].value):
|
|
|
|
yield WayProperties(self, "way_props[%d,%d]" % (zoomLevel, wayIndex))
|
|
|
|
|
|
|
|
|
|
|
|
class ZoomSubFile(SeekableFieldSet):
|
|
|
|
|
|
|
|
def __init__(self, parent, name, zoomIntervalCfg, **kw):
|
|
|
|
SeekableFieldSet.__init__(self, parent, name, **kw)
|
|
|
|
self.zoomIntervalCfg = zoomIntervalCfg
|
|
|
|
|
|
|
|
def createFields(self):
|
|
|
|
if self["/have_debug"].value:
|
|
|
|
yield String(self, "signature", 16)
|
|
|
|
if self['signature'].value != "+++IndexStart+++":
|
|
|
|
raise ValueError
|
|
|
|
|
|
|
|
indexEntries = []
|
|
|
|
numTiles = None
|
|
|
|
i = 0
|
|
|
|
while True:
|
|
|
|
entry = TileIndexEntry(self, "tile_index_entry[]")
|
|
|
|
indexEntries.append(entry)
|
|
|
|
yield entry
|
|
|
|
|
|
|
|
i += 1
|
|
|
|
if numTiles is None:
|
|
|
|
# calculate number of tiles (TODO: better calc this from map
|
|
|
|
# bounding box)
|
|
|
|
firstOffset = self["tile_index_entry[0]"]["offset"].value
|
|
|
|
if self["/have_debug"].value:
|
|
|
|
firstOffset -= 16
|
|
|
|
numTiles = firstOffset / 5
|
|
|
|
if i >= numTiles:
|
|
|
|
break
|
|
|
|
|
|
|
|
for i, indexEntry in enumerate(indexEntries):
|
|
|
|
offset = indexEntry["offset"].value
|
|
|
|
self.seekByte(offset, relative=True)
|
|
|
|
if i != len(indexEntries) - 1:
|
|
|
|
next_offset = indexEntries[i + 1]["offset"].value
|
|
|
|
size = (next_offset - offset) * 8
|
|
|
|
else:
|
|
|
|
size = self.size - offset * 8
|
|
|
|
if size == 0:
|
|
|
|
# hachoir doesn't support empty field.
|
|
|
|
continue
|
|
|
|
yield TileData(self, "tile_data[%d]" % i, zoomIntervalCfg=self.zoomIntervalCfg, size=size)
|
|
|
|
|
|
|
|
|
|
|
|
class MapsforgeMapFile(Parser, RootSeekableFieldSet):
|
|
|
|
PARSER_TAGS = {
|
|
|
|
"id": "mapsforge_map",
|
|
|
|
"category": "misc",
|
|
|
|
"file_ext": ("map",),
|
|
|
|
"min_size": 62 * 8,
|
|
|
|
"description": "Mapsforge map file",
|
|
|
|
}
|
|
|
|
|
|
|
|
endian = BIG_ENDIAN
|
|
|
|
|
|
|
|
def validate(self):
|
|
|
|
return self["file_magic"].value == "mapsforge binary OSM" and self["file_version"].value == 3
|
|
|
|
|
|
|
|
def createFields(self):
|
|
|
|
yield String(self, "file_magic", 20)
|
|
|
|
yield UInt32(self, "header_size")
|
|
|
|
yield UInt32(self, "file_version")
|
|
|
|
yield UInt64(self, "file_size")
|
|
|
|
yield UInt64(self, "creation_date")
|
|
|
|
yield Int32(self, "min_lat")
|
|
|
|
yield Int32(self, "min_lon")
|
|
|
|
yield Int32(self, "max_lat")
|
|
|
|
yield Int32(self, "max_lon")
|
|
|
|
yield UInt16(self, "tile_size")
|
|
|
|
yield VbeString(self, "projection")
|
|
|
|
|
|
|
|
# flags
|
|
|
|
yield Bit(self, "have_debug")
|
|
|
|
yield Bit(self, "have_map_start")
|
|
|
|
yield Bit(self, "have_start_zoom")
|
|
|
|
yield Bit(self, "have_language_preference")
|
|
|
|
yield Bit(self, "have_comment")
|
|
|
|
yield Bit(self, "have_created_by")
|
|
|
|
yield Bits(self, "reserved[]", 2)
|
|
|
|
|
|
|
|
if self["have_map_start"].value:
|
|
|
|
yield UInt32(self, "start_lat")
|
|
|
|
yield UInt32(self, "start_lon")
|
|
|
|
if self["have_start_zoom"].value:
|
|
|
|
yield UInt8(self, "start_zoom")
|
|
|
|
if self["have_language_preference"].value:
|
|
|
|
yield VbeString(self, "language_preference")
|
|
|
|
if self["have_comment"].value:
|
|
|
|
yield VbeString(self, "comment")
|
|
|
|
if self["have_created_by"].value:
|
|
|
|
yield VbeString(self, "created_by")
|
|
|
|
|
|
|
|
yield TagStringList(self, "poi_tags")
|
|
|
|
yield TagStringList(self, "way_tags")
|
|
|
|
|
|
|
|
yield UInt8(self, "num_zoom_intervals")
|
|
|
|
for i in range(self["num_zoom_intervals"].value):
|
|
|
|
yield ZoomIntervalCfg(self, "zoom_interval_cfg[]")
|
|
|
|
|
|
|
|
for i in range(self["num_zoom_intervals"].value):
|
|
|
|
zoomIntervalCfg = self["zoom_interval_cfg[%d]" % i]
|
|
|
|
self.seekByte(zoomIntervalCfg[
|
|
|
|
"subfile_start"].value, relative=False)
|
|
|
|
yield ZoomSubFile(self, "subfile[]", size=zoomIntervalCfg["subfile_size"].value * 8, zoomIntervalCfg=zoomIntervalCfg)
|