P09 Database#

Pandas to SQLite#

Download data of the case from p4css/PSS

# Load data from pickle file

import pickle
with open("data/pttpost_20210509_n178.dat", "rb") as fin:
    all_post = pickle.load(fin)
# Convert a list of dictionaries to a pandas DataFrame
import pandas as pd
df = pd.DataFrame(all_post)

# Select columns
post_df = df[['author', 'title', 'content', 'authorid', 'nickname', 'link', 'timestamp']]
# Create a SQLite database pttpost.db
# Insert post_df into the table posts

import sqlite3
conn = sqlite3.connect('data/pttpost.db')
post_df.to_sql('posts', conn, if_exists='replace', index=False)
conn.close()

Design your API using Flask#

  1. Create an app.py file and paste the following code

  2. Go to the terminal and run the following command: python app.py

  3. Open a browser and send a request to http://localhost:8070/

  4. Open a browser and send a request to http://localhost:8070/data?n=10

  5. Open a browser and send a request to http://localhost:8070/data?fields=author,title,content&n=10

  6. Open a browser and send a request to http://localhost:8070/data?fields=author,title,content&keyword=中國&n=10

"""
from flask import Flask, jsonify, request
import sqlite3

app = Flask(__name__)

DATABASE = 'data/pttpost.db'

def get_db_connection():
    conn = sqlite3.connect(DATABASE)
    conn.row_factory = sqlite3.Row
    return conn

def get_data(fields=None, keyword=None, n=5):
    conn = get_db_connection()

    # 確保提供的欄位有效
    valid_fields = {'title', 'content', 'author', 'authorid', 'timestamp', 'link'}
    if fields:
        query_fields = fields.split(',')
        query_fields = [field for field in query_fields if field in valid_fields]
        if not query_fields:
            query_fields = ['*']
    else:
        query_fields = ['*']

    # 構建SQL查詢
    if keyword:
        query = f'SELECT {", ".join(query_fields)} FROM posts WHERE title LIKE ? LIMIT ?'
        data = conn.execute(query, ('%' + keyword + '%', n)).fetchall()
    else:
        query = f'SELECT {", ".join(query_fields)} FROM posts LIMIT ?'
        data = conn.execute(query, (n,)).fetchall()

    conn.close()
    return [dict(row) for row in data]


@app.route('/')
def welcome():
    return get_data(n=5)


@app.route('/data')
def data():
    fields = request.args.get('fields')
    keyword = request.args.get('keyword')  # 從URL獲取關鍵字
    n = request.args.get('n', default=5, type=int)
    data = get_data(fields, keyword, n)
    return jsonify(data)


if __name__ == '__main__':
    app.run(debug=False, port=8070)
"""
'\nfrom flask import Flask, jsonify, request\nimport sqlite3\n\napp = Flask(__name__)\n\nDATABASE = \'data/pttpost.db\'\n\ndef get_db_connection():\n    conn = sqlite3.connect(DATABASE)\n    conn.row_factory = sqlite3.Row\n    return conn\n\ndef get_data(fields=None, keyword=None, n=5):\n    conn = get_db_connection()\n\n    # 確保提供的欄位有效\n    valid_fields = {\'title\', \'content\', \'author\', \'authorid\', \'timestamp\', \'link\'}\n    if fields:\n        query_fields = fields.split(\',\')\n        query_fields = [field for field in query_fields if field in valid_fields]\n        if not query_fields:\n            query_fields = [\'*\']\n    else:\n        query_fields = [\'*\']\n\n    # 構建SQL查詢\n    if keyword:\n        query = f\'SELECT {", ".join(query_fields)} FROM posts WHERE title LIKE ? LIMIT ?\'\n        data = conn.execute(query, (\'%\' + keyword + \'%\', n)).fetchall()\n    else:\n        query = f\'SELECT {", ".join(query_fields)} FROM posts LIMIT ?\'\n        data = conn.execute(query, (n,)).fetchall()\n\n    conn.close()\n    return [dict(row) for row in data]\n\n\n@app.route(\'/\')\ndef welcome():\n    return get_data(n=5)\n\n\n@app.route(\'/data\')\ndef data():\n    fields = request.args.get(\'fields\')\n    keyword = request.args.get(\'keyword\')  # 從URL獲取關鍵字\n    n = request.args.get(\'n\', default=5, type=int)\n    data = get_data(fields, keyword, n)\n    return jsonify(data)\n\n\nif __name__ == \'__main__\':\n    app.run(debug=False, port=8070)\n'