A toolkit for projects in Python
Based on pymysql
.
MySQLKit
Packaged PyMySQL Connection Manager.
MySQLKitConfig
as configuration definition.MySQLCondition
Conditional Expression Definition.MySQLQueryResult
Packaged Table Query Result.MySQLTableExistence
The three points of a table or view: connection, schema and table.
MySQLViewMixin
Defines readers of a table or view.
MySQLTableMixin
Defines writers of a table.
MySQLAnyTable
A dynamic table model definition.MySQLTableSelection
Selection Chained Reader for a class or view.Write proper configuration in Python to get an instance of MySQLKitConfig
.
By default, PyMySQL made auto_commit
as False
. Maybe set it as True
is a better idea.
# db_config : MySQLKitConfig
db = MySQLKit(db_config)
To reuse or package an existed instance of Connection
, use:
# connection is an instance of pymysql.connections.Connection
db = MySQLKit.make_instance_from_pymysql_connection(connection)
The lowest level action is to use pymysql.connections.Connection
directly.
If you just want to query with a SQL, use the following…
Notice the raw_execute_transaction
method。
It receives a parameter as a callable method to do with queries inside the transaction;
when the callable successfully returns with result, it returns the result after commit;
when the callable raises any Exception, it rolls back and raise the Exception to outside.
# With a simple lambda expression
inserted_id = db.raw_execute_transaction(lambda db: db.raw_query_to_insert_one('insert into ...'))
# Or with an inner def
def act_in_transaction(db:MySQLKit):
id=db.raw_query_to_insert_one('insert into ...')
if id <= 0:
raise Exception('cannot ...')
afx=db.raw_query_to_modify_one('update ...')
return afx
afx = db.raw_execute_transaction(act_in_transaction)
The above raw query methods are useful to query with SQL and optional parameters. Simple query on single table is expected to use dynamic table model.
Select, insert, replace, delete and update rows in one table.
table=MySQLAnyTable(mysql_kit=db,table_name='table_name',schema_name='schema_name')
Amongst the parameters, schema_name
is optional. If omitted, use the default schema defined as db
in config.
With a chained call, you can make selections on a single table.
"""
select name, max(score) as max_score
from `schema_name`.`table_name`
where id > 500
group by name
order by id desc
limit 100 offset 100
"""
result = table.select_in_table() \
.add_select_field('name') \
.add_select_field('max(score)', 'max_score') \
.add_condition(MySQLCondition.make_greater_than('id', '500')) \
.set_group_by_fields(['name']) \
.set_sort_expression('id desc') \
.set_limit(100) \
.set_offset(100) \
.query_for_result_as_tuple_of_dict()
You can set these options:
The methods to fetch results:
Sometimes you may meet huge amount of rows, you may use these methods to fetch with stream (i.e. without cached buffer).
Those methods all return an instance of MySQLQueryResult
.
Class MySQLQueryResult
is there.
First, check the status of result.
if result.get_status() == constant.MYSQL_QUERY_STATUS_ERROR:
# now exception should be raised
raise Exception(f"ERROR THERE: {result.get_error()}")
# here do next work
There are six status constants.
MYSQL_QUERY_STATUS_INIT = "INIT" # Initialized and not executed
MYSQL_QUERY_STATUS_QUERIED = "QUERIED" # Queried and fetched results in buffer, use `result.get_fetched_rows_as_tuple()` to get the tuple of rows.
MYSQL_QUERY_STATUS_EXECUTED = "EXECUTED" # Queried for modification and recorded the last inserted ID or the count of affected rows.
MYSQL_QUERY_STATUS_STREAMING = "STREAMING" # Queried and fetched the stream of rows, use `cursor=result.get_stream()` to fetch the instance of SS-Cursor class.
MYSQL_QUERY_STATUS_STREAMED = "STREAMED" # All queried rows in stream had been red, related SS-Cursor instance also closed.
MYSQL_QUERY_STATUS_ERROR = "ERROR" # Error raised, use `result.get_error()` and `result.get_sql()` to debug.
Back to index