基于Hadoop和imposm3来加速OpenStreetMap数据的导入
基于Hadoop和imposm3来加速OpenStreetMap数据的导入
Section titled “基于Hadoop和imposm3来加速OpenStreetMap数据的导入”在基于OpenStreetMap的离线地图建设方案中,osm数据文件需要导入到PostgreSQL中。官方也提供了一些工具来实现这一过程,比如性能相对比较好的 omniscale/imposm3。但问题还是在于OpenStreetMap的全球数据实在忒大,即使采用imposm3也需要数天时间。
于是,我们就需要一种更高效快捷的方式。
二、技术实现方案
Section titled “二、技术实现方案”2.1 默认imposm3的流程
Section titled “2.1 默认imposm3的流程”我们先看一下imposm3的默认流程:

在这个流程中,写入数据的过程其实同时会创建索引。而优化索引是执行了PostgreSQL的CLUSTER和ANALYZE两个语句:

众所周知,创建索引的过程本身就挺慢的,此外,更不幸的是,CLUSTER还会锁表。并且就我的观察而言,数据导入后,大量的时间都花在了CLUSTER这个阶段。虽然,imposm3中的CLUSTER是可选的,可以不传入-optimize参数。但OpenStreetMap的全球数据终究太大了,还是希望能对数据库索引做一些优化的,以达到最佳的性能。
此外,因为imposm3本身是单机版的,在加载&解析OSM数据,并且将解析后的数据导入到PostgreSQL相应表中也是相当地缓慢。
2.2 整体的优化思路
Section titled “2.2 整体的优化思路”我从以下几个阶段来入手:

2.3 并行写入OSM数据
Section titled “2.3 并行写入OSM数据”
基于上图,这里有以下几个技术点:
- 一份全球planet.osm.pbf如何快速地被切换成多份?
- mapper阶段又如何快速将数据导入到目标PostgreSQL中?
2.3.1 一份全球planet.osm.pbf如何快速地被切换成多份?
Section titled “2.3.1 一份全球planet.osm.pbf如何快速地被切换成多份?”2.3.1.1 切割原则
Section titled “2.3.1.1 切割原则”我们按照什么切割原则来将planet.osm.pbf数据切成多份,有两种方法:
- 方法一:一个最合理的方案其实是按照国境线(如果一个国家面积太大,比如我国、俄罗斯、漂亮国之类的,就按省、州、联邦等二级粒度来切)。
- 方法二:将经纬度按照一定步长来切
这两种方法都可以,并且我都尝试过。
2.3.1.2 按照国境线来切
Section titled “2.3.1.2 按照国境线来切”比较所幸的是:download.geofabrik.de 已经提供了类似类似我上面提到的,把世界切成多个国家、甚至这些面积大的国家按二级粒度来切,我们只要写一个爬虫把这些.osm.pbf抓取下来就行:
#!/usr/bin/env node
const _ = require("lodash");const cheerio = require('cheerio');const axios = require("axios");const {URL} = require('url');const fs = require("fs");const path = require("path");const crypto = require('crypto');const winston = require('winston');const {SingleBar} = require('cli-progress');
const PROXY_PREFIX = ""const INDEX_PAGE = "https://download.geofabrik.de/";
const logger = winston.createLogger({ // 定义日志级别 level: 'info', // 定义日志的格式 format: winston.format.printf(info => `${info.timestamp} ${info.level}: ${info.message}`), // 定义日志的输出 transports: [ // 控制台输出 new winston.transports.Console({ format: winston.format.combine( winston.format.timestamp({ format: 'YYYY-MM-DD HH:mm:ss' }), winston.format.colorize(), winston.format.printf(info => `${info.timestamp} ${info.level}: ${info.message}`) ) }) ]});
async function getPage(url) { while (true) { try { let response = await axios.get(`${PROXY_PREFIX}/${url}`, { rejectUnauthorized: false, userAgent: 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36' }); if (response.status !== 200) { continue; } return response.data;
} catch (e) { } }}
/** * 是否有子区域 */async function hasSubRegions(url) { if (url === INDEX_PAGE) { return true; } const html = await getPage(url); const $ = cheerio.load(html); return $("#subregions").length > 1;}
async function getFinalDownloadUrl(currentUrl) { let osmUrl = { url: "", md5sum: "" } const $ = cheerio.load(await getPage(currentUrl));
$(".download-main a").each((index, element) => { if (osmUrl.url.length > 0) { return; } let $element = $(element); let href = $element.attr("href"); if (href.endsWith(".osm.pbf")) { const fullUrl = new URL(href, currentUrl); osmUrl.url = fullUrl.href; } });
$(".download-main a").each((index, element) => { if (osmUrl.md5sum.length > 0) { return; } let $element = $(element); let href = $element.attr("href"); if (href.endsWith(".osm.pbf.md5")) { osmUrl.md5sum = $element.text(); } });
return osmUrl;}
async function getDownloadUrls(url) { let results = [];
const isLastPage = !await hasSubRegions(url); if (isLastPage) { const finalDownloadUrl = await getFinalDownloadUrl(url); logger.info(`${url} is last page, osm.pbf: ${finalDownloadUrl.url}`); results.push(finalDownloadUrl); return results; }
const html = await getPage(url); const $ = cheerio.load(html); for (const element of $("#subregions:last .subregion a")) { const href = $(element).attr("href"); const fullUrl = new URL(href, url); logger.info(`${url} is index page, goto ${fullUrl.href}`); for (const r of await getDownloadUrls(fullUrl.href)) { results.push(r); } } return results;}
async function downloadFile(url, outputPath) { // 创建一个新的进度条实例 const progressBar = new SingleBar({ format: '下载进度 |{bar}| {percentage}% || {value}/{total} Chunks', barCompleteChar: '\u2588', barIncompleteChar: '\u2591', hideCursor: true });
try { // 发起GET请求 const response = await axios({ method: 'get', url: `${PROXY_PREFIX}/${url}`, responseType: 'stream', rejectUnauthorized: false, userAgent: 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/129.0.0.0 Safari/537.36' });
// 准备写入文件流 const writer = fs.createWriteStream(outputPath);
// 获取内容长度 const totalLength = response.headers['content-length'];
// 初始化进度条 progressBar.start(totalLength, 0);
let receivedLength = 0;
// 监听数据事件 response.data.on('data', (chunk) => { receivedLength += chunk.length; progressBar.update(receivedLength); });
// 数据写入文件 response.data.pipe(writer);
return new Promise((resolve, reject) => { writer.on('finish', () => { progressBar.stop(); resolve(true); });
writer.on('error', (err) => { progressBar.stop(); reject(false); });
// 处理响应错误 response.data.on('error', (err) => { progressBar.stop(); writer.close(); reject(false); }); });
} catch (error) { progressBar.stop(); // 关闭并删除未完成的文件 if (fs.existsSync(outputPath)) { fs.unlinkSync(outputPath); } return false; }}
async function getMD5SUM(filename) { return new Promise((resolve, reject) => { const stream = fs.createReadStream(filename); const hash = crypto.createHash('md5'); stream.on('data', chunk => { hash.update(chunk, 'utf8'); }); stream.on('end', () => { const md5 = hash.digest('hex'); resolve(md5); }); stream.on('error', reject); });}
async function main() { const outputDir = fs.realpathSync(process.argv[2]); if (!fs.existsSync(outputDir)) { fs.mkdirSync(outputDir, {recursive: true, mode: 0o755}); }
const downloads = await getDownloadUrls(INDEX_PAGE); for (const osm of downloads) { logger.info(`Downloading ${osm.url}`); const targetFilename = _.join(_.split(_.replace(_.replace(new URL(osm.url).pathname, /^\//, ""), "-latest", ""), "/"), "-"); const targetFile = path.join(outputDir, targetFilename); while (true) { try { const success = await downloadFile(osm.url, targetFile); if (!success) { continue; } const calValue = await getMD5SUM(targetFile); if (calValue !== osm.md5sum) { logger.error(`${targetFile} md5sum mismatch, expected: ${osm.md5sum}, actual: ${calValue}`); continue; } } catch (e) {} break; }
}}
main().then(async () => { logger.info("下载完成");}).catch((e) => { logger.error(`下载失败: ${e}`);})另外,如果就是想自己切的话,geoboundaries也提供了三级粒度的GeoHash值,直接拿来用就可以了。
#include <iostream>#include <argparse/argparse.hpp>#include <filesystem>#include <fstream>#include <utility>#include <chrono>#include <osmium/io/any_input.hpp>#include <osmium/io/any_output.hpp>#include <osmium/io/reader_with_progress_bar.hpp>#include <osmium/io/writer.hpp>#include <osmium/io/pbf_input.hpp>#include <osmium/util/progress_bar.hpp>#include <osmium/util/verbose_output.hpp>#include <osmium/util/file.hpp>#include <osmium/index/map/all.hpp>#include <osmium/handler/node_locations_for_ways.hpp>#include <osmium/geom/haversine.hpp>#include <osmium/area/assembler.hpp>#include <osmium/area/multipolygon_manager.hpp>#include <osmium/area/assembler.hpp>#include <osmium/area/multipolygon_collector.hpp>#include <osmium/builder/osm_object_builder.hpp>#include <osmium/handler/node_locations_for_ways.hpp>#include <osmium/index/map/sparse_mem_array.hpp>#include <osmium/io/any_input.hpp>#include <osmium/io/file.hpp>#include <osmium/memory/buffer.hpp>#include <osmium/index/map/dense_file_array.hpp>#include <nlohmann/json.hpp>#include <osmium/geom/coordinates.hpp>#include <osmium/visitor.hpp>#include <spdlog/spdlog.h>#include <cstdlib>#include <random>#include <fmt/core.h>#include <thread>#include <mutex>#include <cmath>#include <boost/geometry.hpp>#include <boost/geometry/geometries/point.hpp>#include <boost/geometry/geometries/polygon.hpp>#include <boost/algorithm/string.hpp>
using json = nlohmann::json;using index_type = osmium::index::map::SparseFileArray<osmium::unsigned_object_id_type, osmium::Location>;using location_handler_type = osmium::handler::NodeLocationsForWays<index_type>;namespace bg = boost::geometry;typedef bg::model::point<double, 2, bg::cs::cartesian> Point;typedef bg::model::polygon<Point> Polygon;
enum ADMLevel { ADM0 = 0, ADM1, ADM2, ADM99,};
class IGeoJSONFeature {public: virtual bool contains(double x, double y) = 0;
virtual std::string get_shape_name() = 0;
virtual ADMLevel get_adm_level() = 0;
public: virtual ~IGeoJSONFeature() {};};
class InnerGeoJSONFeature : public IGeoJSONFeature {public: ADMLevel admLevel; std::string shapeName; std::vector<Polygon *> coordinates;
public: virtual ~InnerGeoJSONFeature() {};
public: bool contains(double x, double y) override { Point p(x, y); bool inside = false; for (Polygon *iter: coordinates) { if (bg::within(p, *iter)) { inside = true; break; } } return inside; }
std::string get_shape_name() override { return shapeName; }
ADMLevel get_adm_level() override { return admLevel; }};
class OuterGeoJSONFeature : public IGeoJSONFeature {public: ADMLevel admLevel{ADM99}; std::string shapeName{"others"}; std::vector<Polygon *> coordinates;
public: virtual ~OuterGeoJSONFeature() {}
public: bool contains(double x, double y) override { Point p(x, y); bool inside = false; for (Polygon *iter: coordinates) { if (bg::within(p, *iter)) { inside = true; break; } } return !inside; }
std::string get_shape_name() override { return shapeName; }
ADMLevel get_adm_level() override { return admLevel; }};
void parse_geojson(const std::string &geojson_path, ADMLevel admLevel, std::vector<IGeoJSONFeature *> *geoJsonFeatures) { std::ifstream geojson_file(geojson_path); json geojson; geojson_file >> geojson;
for (auto &feature: geojson["features"]) { InnerGeoJSONFeature *geoJsonFeature = new InnerGeoJSONFeature(); geoJsonFeatures->push_back(geoJsonFeature); geoJsonFeature->admLevel = admLevel; geoJsonFeature->shapeName = feature["properties"]["shapeName"]; auto &geometry = feature["geometry"]; if (geometry["type"] == "Polygon") { Polygon *polygon = new Polygon(); for (auto &level1: geometry["coordinates"]) { for (auto &level2: level1) { bg::append(polygon->outer(), Point(level2[0], level2[1])); } } bg::correct(*polygon); geoJsonFeature->coordinates.push_back(polygon); } else if (geometry["type"] == "MultiPolygon") { for (auto &level0: geometry["coordinates"]) { Polygon *polygon = new Polygon(); for (auto &level1: level0) { for (auto &level2: level1) { bg::append(polygon->outer(), Point(level2[0], level2[1])); } } bg::correct(*polygon); geoJsonFeature->coordinates.push_back(polygon); } } } geojson_file.close();}
std::string join_paths(const std::string &path1, const std::string &path2) { if (path1.empty()) return path2; if (path2.empty()) return path1;
std::filesystem::path p1(path1); std::filesystem::path p2(path2); return (p1 / p2).string();}
class CacheLocationHandler : public osmium::handler::Handler {private: location_handler_type &location_handler;public: CacheLocationHandler(location_handler_type &loc_handler) : location_handler(loc_handler) { }
void node(const osmium::Node &node) { osmium::Location location = node.location(); if (!location.valid()) { return; } location_handler.node(node); }
void way(const osmium::Way &way) { if (way.nodes().empty()) { return; } location_handler.way(const_cast<osmium::Way &>(way)); }
void relation(const osmium::Relation &relation) { if (relation.members().empty()) { return; } location_handler.relation(relation); }};
class MultiWriterHandler : public osmium::handler::Handler {private: std::vector<IGeoJSONFeature *> primary_features; std::vector<IGeoJSONFeature *> secondary_features; std::vector<IGeoJSONFeature *> other_features;; std::vector<osmium::io::Writer *> primary_writers; std::vector<osmium::io::Writer *> secondary_writers; std::vector<osmium::io::Writer *> other_writers; location_handler_type &location_handler;
public: MultiWriterHandler( const std::int32_t thread_id, const std::vector<std::string> &output_dirs, const std::vector<IGeoJSONFeature *> *features, location_handler_type &loc_handler ) : location_handler(loc_handler) {
for (const auto &feature: *features) { if (feature->get_adm_level() == ADM0) { primary_features.push_back(feature); } else if (feature->get_adm_level() == ADM1) { secondary_features.push_back(feature); } else if (feature->get_adm_level() == ADM99) { other_features.push_back(feature); } }
std::random_device rd; // 随机数设备 std::default_random_engine eng(rd()); // 随机数引擎 std::uniform_int_distribution<> distr(0, output_dirs.size() - 1);
for (const auto &feature: primary_features) { std::string shape_name = feature->get_shape_name(); boost::replace_all(shape_name, " ", "-"); std::string pbf_filename = fmt::format("{}_{}.osm.pbf", shape_name, thread_id); this->primary_writers.push_back(new osmium::io::Writer(join_paths(output_dirs[distr(eng)], pbf_filename), osmium::io::overwrite::allow)); } for (const auto &feature: secondary_features) { std::string shape_name = feature->get_shape_name(); boost::replace_all(shape_name, " ", "-"); std::string pbf_filename = fmt::format("{}_{}.osm.pbf", shape_name, thread_id); this->secondary_writers.push_back(new osmium::io::Writer(join_paths(output_dirs[distr(eng)], pbf_filename), osmium::io::overwrite::allow)); } for (const auto &feature: other_features) { std::string pbf_filename = fmt::format("{}_{}.osm.pbf", feature->get_shape_name(), thread_id); this->other_writers.push_back(new osmium::io::Writer(join_paths(output_dirs[distr(eng)], pbf_filename), osmium::io::overwrite::allow)); } }
~MultiWriterHandler() { for (auto &writer: primary_writers) { writer->close(); delete writer; } for (auto &writer: secondary_writers) { writer->close(); delete writer; } for (auto &writer: other_writers) { writer->close(); delete writer; } }
bool pick_to_basket(const osmium::Location &location, const osmium::OSMObject &object) { if (!location.valid()) { return false; } double lon = location.lon(); double lat = location.lat(); for (std::size_t i = 0; i < secondary_features.size(); i++) { IGeoJSONFeature *feature = secondary_features.at(i); if (feature->contains(lon, lat)) { (*secondary_writers[i])(object); return true; } }
for (std::size_t i = 0; i < primary_features.size(); i++) { IGeoJSONFeature *feature = primary_features.at(i); if (feature->contains(lon, lat)) { (*primary_writers[i])(object); return true; } }
// for (std::size_t i = 0; i < other_features.size(); i++) {// IGeoJSONFeature *feature = other_features.at(i);// if (feature->contains(lon, lat)) {// (*other_writers[i])(object);// return true;// }// } return false; }
void node(const osmium::Node &node) { osmium::Location location = node.location();// auto start = std::chrono::system_clock::now(); pick_to_basket(location, node);// auto end = std::chrono::system_clock::now();// SPDLOG_INFO("pick node: {}, speed: {} ms", node.id(), std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count()); }
void way(const osmium::Way &way) { if (way.nodes().empty()) { return; }// auto start = std::chrono::system_clock::now(); for (const auto &node_ref: way.nodes()) { osmium::Location loc = location_handler.get_node_location(node_ref.ref()); bool picked = pick_to_basket(loc, way); if (picked) { break; } }// auto end = std::chrono::system_clock::now();// SPDLOG_INFO("pick way: {}, speed: {} ms", way.id(), std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count()); }
void relation(const osmium::Relation &relation) { if (relation.members().empty()) { return; }// auto start = std::chrono::system_clock::now(); for (const auto &member: relation.members()) { if (member.type() == osmium::item_type::node || member.type() == osmium::item_type::way) { osmium::Location loc = location_handler.get_node_location(member.ref()); if (pick_to_basket(loc, relation)) { break; } } }// auto end = std::chrono::system_clock::now();// SPDLOG_INFO("pick relation: {}, speed: {} ms", relation.id(), std::chrono::duration_cast<std::chrono::milliseconds>(end - start).count()); }};
void process_boxes_chunk( const std::int32_t thread_id, const std::vector<std::string> &output_dirs, location_handler_type &location_handler, const std::vector<IGeoJSONFeature *> &features_chunk, const std::string &input_pbf, std::mutex &output_mutex) { osmium::io::File input_file(input_pbf); osmium::io::Reader reader(input_file, osmium::osm_entity_bits::all); osmium::io::Header header; MultiWriterHandler handler(thread_id, output_dirs, &features_chunk, location_handler); osmium::apply(reader, handler); reader.close();
std::lock_guard<std::mutex> lock(output_mutex);}
int main(int argc, char *argv[]) { spdlog::set_level(spdlog::level::info); spdlog::set_pattern("%+");
argparse::ArgumentParser program("按照bbox范围切割.osm.pbf文件");
program.add_argument("--input") .required() .help("输入 PBF 文件路径");
program.add_argument("--output-dir") .required() .nargs(argparse::nargs_pattern::at_least_one) .help("输出文件的目录,支持多个目录");
program.add_argument("--geojson") .required() .nargs(argparse::nargs_pattern::at_least_one) .help("GEOJSON文件"); program.add_argument("--threads-num").scan<'i', int>().default_value(20).help("线程数");
program.add_argument("--use-cache").default_value(false).implicit_value(true).help("是否使用缓存"); program.add_argument("--index-file").help("索引文件路径");
try { program.parse_args(argc, argv); } catch (const std::runtime_error &err) { std::cerr << err.what() << std::endl; std::cerr << program; return EXIT_FAILURE; }
std::string input_pbf = program.get<std::string>("--input"); std::vector<std::string> output_dirs = program.get<std::vector<std::string>>("--output-dir"); std::vector<std::string> geojson_files = program.get<std::vector<std::string>>("--geojson"); int threads_num = program.get<int>("--threads-num"); bool use_cache = program.get<bool>("--use-cache"); std::string index_path; if (use_cache) { index_path = program.get<std::string>("--index-file"); if (!std::filesystem::exists(index_path)) { SPDLOG_ERROR("索引文件: {} 不存在", index_path); return EXIT_FAILURE; } } else { index_path = join_paths(output_dirs.at(0), "index.idx"); if (std::filesystem::exists(index_path)) { std::remove(index_path.c_str()); } }
if (!std::filesystem::exists(input_pbf)) { SPDLOG_ERROR("输入文件: {} 不存在", input_pbf); return EXIT_FAILURE; }
for (auto &output_dir: output_dirs) { if (!std::filesystem::exists(output_dir)) { SPDLOG_INFO("输出目录: {} 不存在,自动创建……"); std::filesystem::create_directories(output_dir); } }
if (geojson_files.size() > 2) { SPDLOG_ERROR("目前只支持到ADM1级别"); return EXIT_FAILURE; }
std::vector<IGeoJSONFeature *> geojson_features; std::int32_t i = 0; for (i = 0; i < std::min((int)geojson_files.size(), 2); i++) { ADMLevel admLevel; if (i == 0) { admLevel = ADMLevel::ADM0; } else if (i == 1) { admLevel = ADMLevel::ADM1; } else { admLevel = ADMLevel::ADM2; } parse_geojson(geojson_files.at(i), admLevel, &geojson_features); } OuterGeoJSONFeature *outerGeoJsonFeature = new OuterGeoJSONFeature(); for (IGeoJSONFeature* feature: geojson_features) { InnerGeoJSONFeature* innerGeoJsonFeature = dynamic_cast<InnerGeoJSONFeature*>(feature); std::copy(innerGeoJsonFeature->coordinates.begin(), innerGeoJsonFeature->coordinates.end(), std::back_inserter(outerGeoJsonFeature->coordinates)); } geojson_features.push_back(outerGeoJsonFeature);
SPDLOG_INFO("GeoJSON已加载完成,总共: {}", geojson_features.size());
int fd; if (use_cache) { fd = ::open(index_path.c_str(), O_RDWR, 0666); } else { fd = ::open(index_path.c_str(), O_RDWR | O_CREAT | O_EXCL, 0666); } if (fd == -1) { SPDLOG_ERROR("open {} failed", index_path); return EXIT_FAILURE; } index_type index{fd}; location_handler_type location_handler{index}; location_handler.ignore_errors(); if (!use_cache) { auto start = std::chrono::system_clock::now();
SPDLOG_INFO("开始缓存节点信息"); osmium::io::File input_file(input_pbf); osmium::io::Reader reader(input_file, osmium::osm_entity_bits::all); osmium::io::Header header; CacheLocationHandler handler(location_handler); osmium::apply(reader, handler); reader.close();
auto end = std::chrono::system_clock::now(); // 计算耗时,单位为秒 std::chrono::duration<double> elapsed_seconds = end - start; SPDLOG_INFO("节点信息已经缓存完成, 耗时: {} 秒", elapsed_seconds.count()); }
threads_num = std::min(threads_num, static_cast<int>(geojson_features.size())); SPDLOG_INFO("will generate {} osm.pbf files, by {} threads", geojson_features.size(), threads_num);
std::vector<std::thread> threads; std::mutex output_mutex; // 用于线程安全地输出日志 int chunk_size = static_cast<int>(geojson_features.size()) / threads_num; // 每个线程处理的box数量
for (i = 0; i < threads_num; ++i) { std::int32_t start_index = i * chunk_size; std::int32_t end_index = start_index + chunk_size; if (i != (threads_num - 1)) { SPDLOG_INFO("start_index: {}, end_index: {}, chunk_size: {}", start_index, end_index, chunk_size); std::vector<IGeoJSONFeature *> boxes_chunk(geojson_features.begin() + start_index, geojson_features.begin() + end_index); threads.emplace_back(process_boxes_chunk, i, std::ref(output_dirs), std::ref(location_handler), boxes_chunk, std::ref(input_pbf), std::ref(output_mutex)); } else { SPDLOG_INFO("start_index: {}, to end", start_index); std::vector<IGeoJSONFeature *> boxes_chunk(geojson_features.begin() + start_index, geojson_features.end()); threads.emplace_back(process_boxes_chunk, i, std::ref(output_dirs), std::ref(location_handler), boxes_chunk, std::ref(input_pbf), std::ref(output_mutex)); } }
// 等待所有线程完成 for (auto &thread: threads) { if (thread.joinable()) { thread.join(); } }
SPDLOG_INFO("all osm.pbf files generated, begin to clean cache and free memory"); i = 0; for (auto feature: geojson_features) { if (InnerGeoJSONFeature* innerGeoJsonFeature = dynamic_cast<InnerGeoJSONFeature*>(feature)) { for (auto p: innerGeoJsonFeature->coordinates) { delete p; } }
delete feature; } if (!use_cache && std::filesystem::exists(index_path)) { std::remove(index_path.c_str()); } close(fd);
SPDLOG_INFO("all splitted completed"); return EXIT_SUCCESS;}2.3.1.3 按照bbox来切
Section titled “2.3.1.3 按照bbox来切”在github上有一款工具:osmcode/osmium-tool支持将OpenStreetMap数据(planet.osm.pbf)按经纬度范围(bbox)来提出该经纬度范围内的数据,但问题在于如果直接使用osmcode/osmium-tool的bbox来切割,我们将经度范围:-180~180,维度范围:-90~90,按一定步长(10或者20),这么笛卡尔积地组合以下,整个时间复杂度是:O(n^2)。就笔者实际测试来看,当步长是10的话,按此笛卡尔积地组合切割的话,需要一周时间;即使采用基于多核的CPU的并行切割方式,最快也需要3天时间。
因此,必须把这个时间复杂度下降到接近:O(1)。

OpenStreetMap的地理数据本质是一个XML格式,在该XML中,包含三块信息:节点(Node)、路线(Way)、关系(Relation),这三种元素是构建OSM地图的基础,它们之间的关系十分紧密,共同描述了地图上的地理特征和地理实体。下面是内容参考:
<?xml version='1.0' encoding='UTF-8'?><osm version="0.6" generator="osmconvert 0.8.10"> <node id="3387792987" lat="30.2749727" lon="120.186591" version="2" timestamp="2015-04-12T13:21:35Z" changeset="0"/> <way id="331677437" version="9" timestamp="2024-05-25T04:06:18Z" changeset="0"> <nd ref="3387792987"/> <nd ref="3452151973"/> <nd ref="3463697268"/> </way> <relation id="17634728" version="1" timestamp="2024-05-25T04:06:18Z" changeset="0"> <member type="way" ref="288874844" role="part"/> <member type="relation" ref="9346819" role="part"/> <member type="way" ref="28907128" role="part"/> <member type="way" ref="331677437" role="part"/> <member type="way" ref="506752910" role="part"/> <member type="way" ref="482783619" role="part"/> <member type="way" ref="549787945" role="part"/> <tag k="name" v="浙江大学"/> <tag k="name:en" v="Zhejiang University"/> <tag k="name:zh-Hant" v="浙江大學"/> <tag k="short_name:en" v="ZJU"/> <tag k="site" v="university"/> <tag k="type" v="site"/> <tag k="wikidata" v="Q197543"/> <tag k="wikipedia" v="zh:浙江大学"/> </relation></osm>- 节点(Node):
- 节点是OSM地图中的基本元素之一,代表地图上的一个具体位置。
- 每个节点由一个唯一的ID和一对经纬度坐标定义。
- 节点可以独立存在,表示地图上的某个特定地点,如兴趣点(POI),或者与其他节点结合,形成更复杂的结构,如路线或多边形。
- 路线(Way):
- 路线是由两个或多个节点按特定顺序连接形成的线性结构。
- 路线可以表示实际的物理现象,如道路、河流或公园的轮廓。
- 路线既可以是开放的线段,也可以是闭合的环形,闭合路线通常用来定义区域(如建筑物或湖泊的边界)。
- 关系(Relation):
- 关系是OpenStreetMap中用来描述元素之间复杂关系的结构。
- 关系可以包含节点、路线以及其他关系,它们通过”角色”来定义各元素在关系中的作用。
- 常见的关系类型包括公交路线(由多条路线和站点节点组成),限制(如行驶方向限制),或多边形的多部分(例如,一个由多个岛屿组成的国家)。
以上述实例的XML数据为例,id为17634728的relation元素,它的名称是浙江大学,它其中有一个way的id是:331677437。而这条id为331677437的way涉及的其中一个node是3387792987。因此,我们需要把id为17634728的relation和id是331677437的way,都放入id是3387792987的node所在的同一个篮子中。即下图所示:

我们不单单需要将各个Node选入各自的bbox篮子中,相应的Way和Relation也要选入其关联Node的bbox篮子中。
2.3.2 mapper阶段快速将数据导入到目标PostgreSQL中
Section titled “2.3.2 mapper阶段快速将数据导入到目标PostgreSQL中”首先,在上述对全球OpenStreetMap的数据切割成N多份后,下面我们基于Hadoop来构建并发任务,因此会将切割后的数据都转存到HDFS上暂存。本装置的mapper从HDFS上读取切割后的数据开始,单个mapper的核心逻辑下图所示:
核心的Mapper逻辑是这样的:
- 每个Mapper都会启动一个PostgreSQL实例,用于接受这次子任务的数据导入
- 依旧通过imposm3来将该mapper的osm.pbf数据导入到该PostgreSQL实例中(并且不创建索引)
- 然后通过PostgreSQL的COPY语句将成功导入的数据导出到本地磁盘
- 上述这个临时的PostgreSQL实例已经无用了,可以销毁它了。
- 将刚才缓存到本地磁盘上的导出数据,通过使用目标PostgreSQL的COPY语句导入进去。
- 进行索引优化
2.3.3 整体流程总结
Section titled “2.3.3 整体流程总结”我们的核心点在于从单机改成了基于Hadoop的并行导入,极大提升了优化性能。除此之外,在数据导入的前后也做了很多外围工作,整体流程如下:

并且,为了最大化地对整个流程进行优化,在导入过程的前后也做了诸多优化,包括:
- 在开始导入前:
- 关闭autovacuum
- 清楚所有表的已有数据
- 删除所有索引
- 如果是分布式数据库,则确保所有表都是分布式表
- 在结束导入后:
- 开启autovacuum
- 并行创建索引
- 优化索引
通过上述整体的优化,对全球OpenStreetMap地图数据的瓦片数据转换,从需要一周多,优化到只需要2.5小时,性能提升显著。