import { Injectable, Logger } from '@nestjs/common'; import { InjectDataSource, InjectRepository } from '@nestjs/typeorm'; import { DataSource, ILike, In, Repository } from 'typeorm'; import { pipeline } from 'stream/promises'; import { join } from 'path'; import { GeonameQuery, WorkDirectory } from './geonames.interfaces'; import { createInterface } from 'readline'; import { Geoname } from './geonames.entity'; import { intOrNull } from 'src/utils/int-or-null'; import * as fs from 'fs'; import * as unzipper from 'unzipper'; const GEONAMES_DUMP = 'https://download.geonames.org/export/dump/allCountries.zip'; const context = 'GeonamesImport'; const ACCEPT_FIELDS = [ 'geonameid', 'name', 'asciiname', 'alternatenames', 'latitude', 'longitude', 'featureclass', 'featurecode', 'countrycode', 'cc2', 'admin1code', 'admin2code', 'admin3code', 'admin4code', 'population', 'elevation', 'dem', 'timezone', 'moddate', ]; type ReduceType = Partial>; @Injectable() export class GeonamesService { constructor( @InjectDataSource('geobase') private dataSource: DataSource, @InjectRepository(Geoname, 'geobase') private geonameRepository: Repository, ) {} private mapAllowedQuery(select: string[]) { return (Array.isArray(select) ? select : [select]).filter((field) => ACCEPT_FIELDS.includes(field), ) as unknown as (keyof Geoname)[]; } async search(params: GeonameQuery) { const select = this.mapAllowedQuery(params.fields || ACCEPT_FIELDS); let where = Object.keys(params) .filter((key) => !['fields', 'limit', 'q', 'offset'].includes(key)) .reduce( (obj, key) => ACCEPT_FIELDS.includes(key) ? { ...obj, [key]: Array.isArray(params[key]) ? In(params[key]) : params[key], } : obj, {}, ) as ReduceType | ReduceType[]; if (params.q) { let searchTerm = params.q; if (searchTerm.startsWith('ext:')) { searchTerm = searchTerm.substring(4); where = ['name', 'asciiname', 'alternatenames'].map((field) => ({ ...(where as ReduceType), [field as keyof Geoname]: ILike(`%${searchTerm}%`), })); } else { where['name'] = ILike(`%${searchTerm}%`); } } const take = Math.max( Math.min(intOrNull(params.limit as string) || 50, 1000), 1, ); const skip = intOrNull(params.offset as string) || 0; return this.geonameRepository.find({ select, where: where as unknown, skip, take, }); } async workDirectory(): Promise { const path = await fs.promises.mkdtemp(join(process.cwd(), '.gntmp-')); return { path, remove: async () => { await fs.promises.rm(path, { recursive: true, force: true }); }, }; } async downloadGeodump({ path }: WorkDirectory): Promise { const outfile = join(path, 'out.txt'); const output = fs.createWriteStream(outfile); const httpStream = await fetch(GEONAMES_DUMP); await pipeline( httpStream.body as unknown as NodeJS.ReadableStream, unzipper.Parse().on('entry', (entry) => { if (entry.path === 'allCountries.txt') { entry.pipe(output); } else { entry.autodrain(); } }), ); return outfile; } async parseGeodump(path: string) { const read = fs.createReadStream(path); const rl = createInterface({ terminal: false, input: read, crlfDelay: Infinity, }); const queryRunner = this.dataSource.createQueryRunner(); await queryRunner.connect(); await queryRunner.startTransaction(); let entities = 0; let totalBatches = 0; let totalEntities = 0; for await (const line of rl) { const split = line.split('\t'); const model = new Geoname(); Object.assign(model, { geonameid: parseInt(split[0], 10), name: split[1], asciiname: split[2], alternatenames: split[3], latitude: parseFloat(split[4]) || null, longitude: parseFloat(split[5]) || null, featureclass: split[6], featurecode: split[7], countrycode: split[8], cc2: split[9], admin1code: split[10] || null, admin2code: split[11] || null, admin3code: split[12] || null, admin4code: split[13] || null, population: intOrNull(split[14]), elevation: intOrNull(split[15]), dem: intOrNull(split[16]), timezone: split[17], moddate: split[18], }); await queryRunner.manager.save(model); entities += 1; totalEntities += 1; if (entities >= 100) { totalBatches += 1; try { await queryRunner.commitTransaction(); } catch (err) { Logger.error(`Some fields failed to insert: ${err}`, context); await queryRunner.rollbackTransaction(); } if (totalBatches % 10 === 0) { Logger.log( `Batch ${totalBatches} committed, ${totalEntities} entities so far`, context, ); } await queryRunner.startTransaction(); entities = 0; } } await queryRunner.commitTransaction(); await queryRunner.release(); } async runUpdateCycle() { Logger.log('Starting geonames importer', context); const workDirectory = await this.workDirectory(); try { Logger.log('Downloading dump...', context); // const file = await this.downloadGeodump(workDirectory); Logger.log('Creating database...', context); await this.parseGeodump( '/home/evert/Projects/evert-earth-utils/.gntmp-sof1ES/out.txt', ); //file); } catch (e) { await workDirectory.remove(); throw e; } Logger.log('Cleaning up', context); await workDirectory.remove(); Logger.log('Done!', context); } }