本文将详细探讨如何在Python中连接全种类数据库以及实现相应的CRUD(创建,读取,更新,删除)操作。我们将逐一解析连接MySQL,SQL Server,Oracle,PostgreSQL,MongoDB,SQLite,DB2,Redis,Cassandra,Microsoft Access,ElasticSearch,Neo4j,InfluxDB,Snowflake,Amazon DynamoDB,Microsoft Azure CosMos DB数据库的方法,并演示相应的CRUD操作。 MySQL连接数据库Python可以使用mysql-connector-python库连接MySQL数据库: import mysql.connectorconn = mysql.connector.connect(user='username', password='password', host='127.0.0.1', database='my_database')print('Opened MySQL database successfully')conn.close() CRUD操作接下来,我们将展示在MySQL中如何进行基本的CRUD操作。 创建(Create)
读取(Retrieve)conn = mysql.connector.connect(user='username', password='password', host='127.0.0.1', database='my_database')cursor = conn.cursor()cursor.execute('SELECT id, name, address, salary from Employees')rows = cursor.fetchall()for row in rows: print('ID = ', row[0]) print('NAME = ', row[1]) print('ADDRESS = ', row[2]) print('SALARY = ', row[3])conn.close() 更新(Update)
删除(Delete)conn = mysql.connector.connect(user='username', password='password', host='127.0.0.1', database='my_database')cursor = conn.cursor()cursor.execute('DELETE from Employees where ID = 1')conn.commit()print('Total number of rows deleted :', cursor.rowcount)conn.close() SQL Server连接数据库Python可以使用pyodbc库连接SQL Server数据库:
CRUD操作接下来,我们将展示在SQL Server中如何进行基本的CRUD操作。 创建(Create)conn = pyodbc.connect('DRIVER={SQL Server};SERVER=localhost;DATABASE=my_database;UID=username;PWD=password')cursor = conn.cursor()cursor.execute('CREATE TABLE Employees (ID INT PRIMARY KEY NOT NULL, NAME VARCHAR(20) NOT NULL, AGE INT, ADDRESS CHAR(50), SALARY REAL)')conn.commit()print('Table created successfully')conn.close() 读取(Retrieve)
更新(Update)conn = pyodbc.connect('DRIVER={SQL Server};SERVER=localhost;DATABASE=my_database;UID=username;PWD=password')cursor = conn.cursor()cursor.execute('UPDATE Employees set SALARY = 25000.00 where ID = 1')conn.commit()print('Total number of rows updated :', cursor.rowcount)conn.close() 删除(Delete)
Oracle连接数据库Python可以使用cx_Oracle库连接Oracle数据库: import cx_Oracledsn_tns = cx_Oracle.makedsn('localhost', '1521', service_name='my_database') conn = cx_Oracle.connect(user='username', password='password', dsn=dsn_tns)print('Opened Oracle database successfully')conn.close() CRUD操作接下来,我们将展示在Oracle中如何进行基本的CRUD操作。 创建(Create)
读取(Retrieve)dsn_tns = cx_Oracle.makedsn('localhost', '1521', service_name='my_database') conn = cx_Oracle.connect(user='username', password='password', dsn=dsn_tns)cursor = conn.cursor()cursor.execute('SELECT id, name, address, salary from Employees')rows = cursor.fetchall()for row in rows: print('ID = ', row[0]) print('NAME = ', row[1]) print('ADDRESS = ', row[2]) print('SALARY = ', row[3])conn.close() 更新(Update)
删除(Delete)dsn_tns = cx_Oracle.makedsn('localhost', '1521', service_name='my_database') conn = cx_Oracle.connect(user='username', password='password', dsn=dsn_tns)cursor = conn.cursor()cursor.execute('DELETE from Employees where ID = 1')conn.commit()print('Total number of rows deleted :', cursor.rowcount)conn.close() PostgreSQL连接数据库Python可以使用psycopg2库连接PostgreSQL数据库:
CRUD操作接下来,我们将展示在PostgreSQL中如何进行基本的CRUD操作。 创建(Create)conn = psycopg2.connect(database='my_database', user='username', password='password', host='127.0.0.1', port='5432')cursor = conn.cursor()cursor.execute('''CREATE TABLE Employees (ID INT PRIMARY KEY NOT NULL, NAME TEXT NOT NULL, AGE INT NOT NULL, ADDRESS CHAR(50), SALARY REAL);''')conn.commit()print('Table created successfully')conn.close() 读取(Retrieve)
更新(Update)conn = psycopg2.connect(database='my_database', user='username', password='password', host='127.0.0.1', port='5432')cursor = conn.cursor()cursor.execute('UPDATE Employees set SALARY = 25000.00 where ID = 1')conn.commit()print('Total number of rows updated :', cursor.rowcount)conn.close() 删除(Delete)
MongoDB连接数据库Python可以使用pymongo库连接MongoDB数据库: from pymongo import MongoClientclient = MongoClient('mongodb://localhost:27017/')db = client['my_database']print('Opened MongoDB database successfully')client.close() CRUD操作接下来,我们将展示在MongoDB中如何进行基本的CRUD操作。 创建(Create)在MongoDB中,文档的创建操作通常包含在插入操作中:
读取(Retrieve)client = MongoClient('mongodb://localhost:27017/')db = client['my_database']employees = db['Employees']cursor = employees.find()for document in cursor: print(document)client.close() 更新(Update)
删除(Delete)client = MongoClient('mongodb://localhost:27017/')db = client['my_database']employees = db['Employees']query = { 'id': '1' }employees.delete_one(query)print('Document deleted successfully')client.close() SQLite连接数据库Python使用sqlite3库连接SQLite数据库:
CRUD操作接下来,我们将展示在SQLite中如何进行基本的CRUD操作。 创建(Create)conn = sqlite3.connect('my_database.db')cursor = conn.cursor()cursor.execute('''CREATE TABLE Employees (ID INT PRIMARY KEY NOT NULL, NAME TEXT NOT NULL, AGE INT NOT NULL, ADDRESS CHAR(50), SALARY REAL);''')conn.commit()print('Table created successfully')conn.close() 读取(Retrieve)
更新(Update)conn = sqlite3.connect('my_database.db')cursor = conn.cursor()cursor.execute('UPDATE Employees set SALARY = 25000.00 where ID = 1')conn.commit()print('Total number of rows updated :', cursor.rowcount)conn.close() 删除(Delete)
DB2连接数据库Python可以使用ibm_db库连接DB2数据库: import ibm_dbdsn = ( 'DRIVER={{IBM DB2 ODBC DRIVER}};' 'DATABASE=my_database;' 'HOSTNAME=127.0.0.1;' 'PORT=50000;' 'PROTOCOL=TCPIP;' 'UID=username;' 'PWD=password;')conn = ibm_db.connect(dsn, '', '')print('Opened DB2 database successfully')ibm_db.close(conn) CRUD操作接下来,我们将展示在DB2中如何进行基本的CRUD操作。 创建(Create)
读取(Retrieve)conn = ibm_db.connect(dsn, '', '')sql = 'SELECT id, name, address, salary from Employees'stmt = ibm_db.exec_immediate(conn, sql)while ibm_db.fetch_row(stmt): print('ID = ', ibm_db.result(stmt, 'ID')) print('NAME = ', ibm_db.result(stmt, 'NAME')) print('ADDRESS = ', ibm_db.result(stmt, 'ADDRESS')) print('SALARY = ', ibm_db.result(stmt, 'SALARY'))ibm_db.close(conn) 更新(Update)
删除(Delete)conn = ibm_db.connect(dsn, '', '')sql = 'DELETE from Employees where ID = 1'stmt = ibm_db.exec_immediate(conn, sql)ibm_db.commit(conn)print('Total number of rows deleted :', ibm_db.num_rows(stmt))ibm_db.close(conn) Microsoft Access连接数据库Python可以使用pyodbc库连接Microsoft Access数据库:
CRUD操作接下来,我们将展示在Access中如何进行基本的CRUD操作。 创建(Create)conn = pyodbc.connect(conn_str)cursor = conn.cursor()cursor.execute('''CREATE TABLE Employees (ID INT PRIMARY KEY NOT NULL, NAME TEXT NOT NULL, AGE INT NOT NULL, ADDRESS CHAR(50), SALARY DECIMAL(9, 2));''')conn.commit()print('Table created successfully')conn.close() 读取(Retrieve)
更新(Update)conn = pyodbc.connect(conn_str)cursor = conn.cursor()cursor.execute('UPDATE Employees set SALARY = 25000.00 where ID = 1')conn.commit()print('Total number of rows updated :', cursor.rowcount)conn.close() 删除(Delete)
Cassandra连接数据库Python可以使用cassandra-driver库连接Cassandra数据库: from cassandra.cluster import Clustercluster = Cluster(['127.0.0.1'])session = cluster.connect('my_keyspace')print('Opened Cassandra database successfully')cluster.shutdown() CRUD操作接下来,我们将展示在Cassandra中如何进行基本的CRUD操作。 创建(Create)
读取(Retrieve)cluster = Cluster(['127.0.0.1'])session = cluster.connect('my_keyspace')rows = session.execute('SELECT id, name, address, salary FROM Employees')for row in rows: print('ID = ', row.id) print('NAME = ', row.name) print('ADDRESS = ', row.address) print('SALARY = ', row.salary)cluster.shutdown() 更新(Update)
删除(Delete)cluster = Cluster(['127.0.0.1'])session = cluster.connect('my_keyspace')session.execute('DELETE FROM Employees WHERE id = 1')print('Row deleted successfully')cluster.shutdown() Redis连接数据库Python可以使用redis-py库连接Redis数据库:
CRUD操作接下来,我们将展示在Redis中如何进行基本的CRUD操作。 创建(Create)r = redis.Redis(host='localhost', port=6379, db=0)r.set('employee:1:name', 'John')r.set('employee:1:age', '30')r.set('employee:1:address', 'New York')r.set('employee:1:salary', '1000.00')print('Keys created successfully') 读取(Retrieve)
更新(Update)r = redis.Redis(host='localhost', port=6379, db=0)r.set('employee:1:salary', '25000.00')print('Key updated successfully') 删除(Delete)
ElasticSearch连接数据库Python可以使用elasticsearch库连接ElasticSearch数据库: from elasticsearch import Elasticsearches = Elasticsearch([{'host': 'localhost', 'port': 9200}])print('Opened ElasticSearch database successfully') CRUD操作接下来,我们将展示在ElasticSearch中如何进行基本的CRUD操作。 创建(Create)
读取(Retrieve)es = Elasticsearch([{'host': 'localhost', 'port': 9200}])res = es.get(index='employees', doc_type='employee', id=1)print('Document details:')for field, details in res['_source'].items(): print(f'{field.upper()} = ', details) 更新(Update)
删除(Delete)es = Elasticsearch([{'host': 'localhost', 'port': 9200}])res = es.delete(index='employees', doc_type='employee', id=1)print('Document deleted successfully') Neo4j连接数据库Python可以使用neo4j库连接Neo4j数据库:
CRUD操作接下来,我们将展示在Neo4j中如何进行基本的CRUD操作。 创建(Create)driver = GraphDatabase.driver('bolt://localhost:7687', auth=('neo4j', 'password'))with driver.session() as session: session.run('CREATE (:Employee {id: 1, name: 'John', age: 30, address: 'New York', salary: 1000.00})')print('Node created successfully')driver.close() 读取(Retrieve)
更新(Update)driver = GraphDatabase.driver('bolt://localhost:7687', auth=('neo4j', 'password'))with driver.session() as session: session.run('MATCH (n:Employee) WHERE n.id = 1 SET n.salary = 25000.00')print('Node updated successfully')driver.close() 删除(Delete)
InfluxDB连接数据库Python可以使用InfluxDB-Python库连接InfluxDB数据库: from influxdb import InfluxDBClientclient = InfluxDBClient(host='localhost', port=8086)print('Opened InfluxDB database successfully')client.close() CRUD操作接下来,我们将展示在InfluxDB中如何进行基本的CRUD操作。 创建(Create)
读取(Retrieve)client = InfluxDBClient(host='localhost', port=8086)result = client.query('SELECT 'name', 'age', 'address', 'salary' FROM 'employees'')for point in result.get_points(): print('ID = ', point['id']) print('NAME = ', point['name']) print('AGE = ', point['age']) print('ADDRESS = ', point['address']) print('SALARY = ', point['salary'])client.close() 更新(Update)InfluxDB的数据模型和其他数据库不同,它没有更新操作。但是你可以通过写入一个相同的数据点(即具有相同的时间戳和标签)并改变字段值,实现类似更新操作的效果。 删除(Delete)同样,InfluxDB也没有提供删除单个数据点的操作。然而,你可以删除整个系列(即表)或者删除某个时间段的数据。
Snowflake连接数据库Python可以使用 from snowflake.connector import connectcon = connect( user='username', password='password', account='account_url', warehouse='warehouse', database='database', schema='schema')print('Opened Snowflake database successfully')con.close() CRUD操作接下来,我们将展示在Snowflake中如何进行基本的CRUD操作。 创建(Create)
读取(Retrieve)con = connect( user='username', password='password', account='account_url', warehouse='warehouse', database='database', schema='schema')cur = con.cursor()cur.execute('SELECT * FROM EMPLOYEES WHERE ID = 1')rows = cur.fetchall()for row in rows: print('ID = ', row[0]) print('NAME = ', row[1]) print('AGE = ', row[2]) print('ADDRESS = ', row[3]) print('SALARY = ', row[4])con.close() 更新(Update)
删除(Delete)con = connect( user='username', password='password', account='account_url', warehouse='warehouse', database='database', schema='schema')cur = con.cursor()cur.execute('DELETE FROM EMPLOYEES WHERE ID = 1')print('Row deleted successfully')con.close() Amazon DynamoDB连接数据库Python可以使用boto3库连接Amazon DynamoDB:
CRUD操作接下来,我们将展示在DynamoDB中如何进行基本的CRUD操作。 创建(Create)table = dynamodb.create_table( TableName='Employees', KeySchema=[ { 'AttributeName': 'id', 'KeyType': 'HASH' }, ], AttributeDefinitions=[ { 'AttributeName': 'id', 'AttributeType': 'N' }, ], ProvisionedThroughput={ 'ReadCapacityUnits': 5, 'WriteCapacityUnits': 5 })table.put_item( Item={ 'id': 1, 'name': 'John', 'age': 30, 'address': 'New York', 'salary': 1000.00 })print('Table created and item inserted successfully') 读取(Retrieve)
更新(Update)table = dynamodb.Table('Employees')table.update_item( Key={ 'id': 1, }, UpdateExpression='SET salary = :val1', ExpressionAttributeValues={ ':val1': 25000.00 })print('Item updated successfully') 删除(Delete)
Microsoft Azure CosMos DB连接数据库Python可以使用azure-cosmos库连接Microsoft Azure CosMos DB: from azure.cosmos import CosmosClient, PartitionKey, exceptionsurl = 'Cosmos DB Account URL'key = 'Cosmos DB Account Key'client = CosmosClient(url, credential=key)database_name = 'testDB'database = client.get_database_client(database_name)container_name = 'Employees'container = database.get_container_client(container_name)print('Opened CosMos DB successfully') CRUD操作接下来,我们将展示在CosMos DB中如何进行基本的CRUD操作。 创建(Create)
读取(Retrieve)for item in container.read_all_items(): print(item) 更新(Update)
删除(Delete)for item in container.read_all_items(): if item['id'] == '1': container.delete_item(item, partition_key='1')print('Item deleted successfully') |
|