本文主要介绍了一个将 MongoDB 中的数据导入到 MySQL 中的 Python 工具类 MongoToMysql。该工具类实现了获取 MongoDB 数据类型、创建 MySQL 表结构以及将数据从 MongoDB 推送到 MySQL 等功能。
通过该工具类,用户可以轻松地将 MongoDB 中的数据导入到 MySQL 中,实现数据的转移和使用。
使用该工具类,用户需要传入相应的参数,包括 MongoDB 的连接信息,MySQL 的连接信息,以及表名、是否设置最大长度、批处理大小和表描述等信息。具体使用可以参考代码中的注释。
实现代码
import pymysql from loguru import logger class MongoToMysql: def __init__(self, mongo_host, mongo_port, mongo_db, mongo_collection, mysql_host, mysql_port, mysql_user, mysql_password, mysql_db,table_name=None,set_max_length=False,batch_size=10000,table_description=''): self.mongo_host = mongo_host self.mongo_port = mongo_port self.mongo_db = mongo_db self.mongo_collection = mongo_collection self.mysql_host = mysql_host self.mysql_port = mysql_port self.mysql_user = mysql_user self.mysql_password = mysql_password self.mysql_db = mysql_db self.table_name = table_name self.set_max_length = set_max_length self.batch_size = batch_size self.table_description = table_description self.data_types = self.get_mongo_data_types() self.create_mysql_table(self.data_types,set_max_length= self.set_max_length,table_description=self.table_description) self.push_data_to_mysql(self.batch_size) def get_mongo_data_types(self): logger.debug('正在获取mongo中字段的类型!') client = pymongo.MongoClient(host=self.mongo_host, port=self.mongo_port) db = client[self.mongo_db] collection = db[self.mongo_collection] data_types = {} for field in collection.find_one().keys(): data_types[field] = type(collection.find_one()[field]).__name__ return data_types def check_mysql_table_exists(self): logger.debug('检查是否存在该表,有则删之!') conn = pymysql.connect(host=self.mysql_host, port=self.mysql_port, user=self.mysql_user, password=self.mysql_password, db=self.mysql_db) cursor = conn.cursor() sql = f"DROP TABLE IF EXISTS {self.mongo_collection}" cursor.execute(sql) conn.commit() conn.close() def get_max_length(self, field): logger.debug(f'正在获取字段 {field} 最大长度......') client = pymongo.MongoClient(host=self.mongo_host, port=self.mongo_port) db = client[self.mongo_db] collection = db[self.mongo_collection] max_length = 0 for item in collection.find({},{field:1,'_id':0}): value = item.get(field) if isinstance(value, str) and len(value) > max_length: max_length = len(value) return max_length def create_mysql_table(self, data_types,set_max_length,table_description): logger.debug('正在mysql中创建表结构!') self.check_mysql_table_exists() conn = pymysql.connect(host=self.mysql_host, port=self.mysql_port, user=self.mysql_user, password=self.mysql_password, db=self.mysql_db) cursor = conn.cursor() if self.table_name: table_name = self.table_name else: table_name = self.mongo_collection fields = [] for field, data_type in data_types.items(): if data_type == 'int': fields.append(f"{field} INT") elif data_type == 'float': fields.append(f"{field} FLOAT") elif data_type == 'bool': fields.append(f"{field} BOOLEAN") else: if set_max_length: fields.append(f"{field} TEXT)") else: max_length = self.get_max_length(field) fields.append(f"{field} VARCHAR({max_length + 200})") fields_str = ','.join(fields) sql = f"CREATE TABLE {table_name} (id INT PRIMARY KEY AUTO_INCREMENT,{fields_str}) COMMENT='{table_description}'" cursor.execute(sql) conn.commit() conn.close() def push_data_to_mysql(self, batch_size=10000): logger.debug('--- 正在准备从mongo中每次推送10000条数据到mysql ----') client = pymongo.MongoClient(host=self.mongo_host, port=self.mongo_port) db = client[self.mongo_db] collection = db[self.mongo_collection] conn = pymysql.connect(host=self.mysql_host, port=self.mysql_port, user=self.mysql_user, password=self.mysql_password, db=self.mysql_db) cursor = conn.cursor() if self.table_name: table_name = self.table_name else: table_name = self.mongo_collection # table_name = self.mongo_collection data = [] count = 0 for item in collection.find(): count += 1 row = [] for field, data_type in self.data_types.items(): value = item.get(field) if value is None: row.append(None) elif data_type == 'int': row.append(str(item.get(field, 0))) elif data_type == 'float': row.append(str(item.get(field, 0.0))) elif data_type == 'bool': row.append(str(item.get(field, False))) else: row.append(str(item.get(field, ''))) data.append(row) if count % batch_size == 0: placeholders = ','.join(['%s'] * len(data[0])) sql = f"INSERT INTO {table_name} VALUES (NULL,{placeholders})" cursor.executemany(sql, data) conn.commit() data = [] logger.debug(f'--- 已完成推送:{count} 条数据! ----') if data: placeholders = ','.join(['%s'] * len(data[0])) sql = f"INSERT INTO {table_name} VALUES (NULL,{placeholders})" cursor.executemany(sql, data) conn.commit() logger.debug(f'--- 已完成推送:{count} 条数据! ----') conn.close() if __name__ == '__main__': """MySQL""" mongo_host = '127.0.0.1' mongo_port = 27017 mongo_db = 'db_name' mongo_collection = 'collection_name' """MongoDB""" mysql_host = '127.0.0.1' mysql_port = 3306 mysql_user = 'root' mysql_password = '123456' mysql_db = 'mysql_db' table_description = '' # 表描述 mongo_to_mysql = MongoToMysql(mongo_host, mongo_port, mongo_db, mongo_collection, mysql_host, mysql_port, mysql_user, mysql_password, mysql_db,table_description=table_description) # # table_name = None # 默认为None 则MySQL中的表名跟Mongo保持一致 # set_max_length = False # 默认为False 根据mongo中字段最大长度 加200 来设置字段的VARCHART长度 , 否则定义TEXT类型 # batch_size = 10000 # 控制每次插入数据量的大小 # table_description = '' # 表描述 # mongo_to_mysql = MongoToMysql(mongo_host, mongo_port, mongo_db, mongo_collection, mysql_host, mysql_port, # mysql_user, mysql_password, mysql_db,table_name,set_max_length,batch_size,table_description)
代码使用了 PyMongo、PyMySQL 和 Loguru 等 Python 库,并封装了一个 MongoToMysql 类。在类的初始化时,会自动获取 MongoDB 中字段的类型,并根据字段类型创建 MySQL 表结构。在将数据从 MongoDB 推送到 MySQL 时,还可以控制每次插入数据量的大小,以避免一次性插入大量数据造成系统崩溃或性能下降。
需要注意的是,在创建 MySQL 表结构时,如果用户选择了设置最大长度,则会创建 TEXT 类型的字段,否则会根据 MongoDB 中字段的最大长度加上200来设置 VARCHAR 类型的字段长度。
总之,本文介绍的 MongoToMysql 工具类非常方便实用,对于需要将 MongoDB 数据迁移到 MySQL 的用户来说,是一种很好的解决方案。
原文地址:https://juejin.cn/post/7229152434801721402