数据库的搭建 - 流动知识检索
Readwise 是我体验的最舒服的 after-reading 软件了,之前一直没有利用好它的 API,在 PKM 体系里,也一直没有统一到工作流。而这些流动知识的数据越来越多,以 10 年 20 年来度量,必须用数据库来完善和管理了,也是碰巧看到了这个项目:pocketbase/pocketbase: Open Source realtime backend in 1 file ,很简洁的管理 UI,支持 S3 数据备份,支持 API 导入和导出。还有一个更轻量的选择 teableio/teable: ✨ The Next Gen Airtable Alternative: No-Code Postgres ,用了半年了,之前一直没好好看它们的 API,只是不能直接用数据库的 API 导入数据,但是是可以直接用 API 添加 Record 到表格的。
pocketbase 这个项目的优势是更适合长文内容的保存,内置编辑器,但是不支持直接导入表格或 csv 文件,但可以导入 JSON 和 S3 自动备份。
teable 则更偏向于可视化表格,不太适合长文内容的保存。由于我的长文都用 clip 保存了,clip 面向阅读体验,而teable 用来快速的查询,面向数据。现在只保存了三个 field:title,url,和一句话内容总结,方便我时间长了之后回忆和查找相关文章。
在 Claude 的协助下,有了现在的方案。
主要思路:利用 GitHub 为中转站, osmos::memos
插件保存文章,同步保存到 Readwise 进行阅读和高亮,再同步保存到 pocketbase 或 teable 做数据库处理。现在暂时选择了 teable 的方案。
同步保存到 Readwise
现在使用 osmos::memos
插件保存的时候,会默认也保存到 readwise。添加 #2clip
标签触发 clip 的 workflow。详见: 用 GitHub 仓库做书签和 AI 摘要 - 流动知识检索
这样就统一起来了。所有的链接都保存在 bookmark-collection 仓库作为中转站,然后统一在 Readwise 里阅读,导出 highlights。有保存价值的会触发 clip workflow 保存原文。
主要思路:在 push 的时候会运行这个 Python 脚本,读取新增内容,获取 URL 并使用 Readwise Reader 的 API 保存到 Readwise Reader。Readwise 和 Readwise Reader 是两个 API。
需要在仓库的 secret 里增加一个 READWISE_TOKEN
secert,填入自己的 Readwise API。
python
import os import re import logging import requests from github import Github logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) def get_added_content(): """获取这次提交新增的内容""" try: g = Github(os.environ['GITHUB_TOKEN']) repo = g.get_repo(os.environ['GITHUB_REPOSITORY']) commit_sha = os.environ.get('GITHUB_SHA') commit = repo.get_commit(commit_sha) for file in commit.files: if file.filename.endswith('.md'): if file.patch and '+' in file.patch: added_lines = [line[1:] for line in file.patch.split('\n') if line.startswith('+') and not line.startswith('+++')] return added_lines return [] except Exception as e: logging.error(f"Error getting commit content: {str(e)}") raise def extract_url(line: str): """从行中提取URL,支持markdown格式的链接""" # 更新后的URL提取模式 url_pattern = r'\((https?://[\w\-._~:/?#\[\]@!$&\'()*+,;=.%]+)\)' match = re.search(url_pattern, line) if match: return match.group(1) return None def has_clip_tag(line: str) -> bool: """检查是否包含 #2clip 标签""" return bool(re.search(r'#2clip\b', line)) def trigger_workflow(): """触发另一个workflow""" try: g = Github(os.environ['GITHUB_TOKEN']) repo = g.get_repo(os.environ['GITHUB_REPOSITORY']) workflow = repo.get_workflow("bookmark_summary.yml") workflow.create_dispatch("main") logging.info("Successfully triggered the bookmark_summary workflow") except Exception as e: logging.error(f"Failed to trigger workflow: {str(e)}") raise def main(): try: # 获取所有新增的内容 added_lines = get_added_content() if not added_lines: logging.info("No new markdown content found in this commit") return trigger_needed = False # 处理每一行新增的内容 for line in added_lines: line = line.strip() logging.info(f"Processing line: {line}") # 检查标签 if has_clip_tag(line): trigger_needed = True logging.info("Found #2clip tag") # 提取并处理URL(无论是否有标签) url = extract_url(line) if url: try: response = requests.post( url="https://readwise.io/api/v3/save/", headers={"Authorization": f"Token {os.environ['READWISE_TOKEN']}"}, json={ "url": url, "tags": ["Bookmark"] } ) response.raise_for_status() logging.info(f"Successfully saved URL: {url}") except requests.exceptions.RequestException as e: logging.error(f"Failed to save URL {url}: {str(e)}") # 如果发现了标签,触发workflow if trigger_needed: logging.info("Triggering workflow due to #2clip tag") trigger_workflow() except Exception as e: logging.error(f"Error: {str(e)}") raise if __name__ == "__main__": main()
workflow
name: Save Bookmark to Readwise on: push: branches: - main paths: - '**.md' workflow_dispatch: permissions: contents: read actions: write jobs: save-to-readwise: runs-on: ubuntu-latest steps: - name: Checkout repository uses: actions/checkout@v4 with: token: ${{ secrets.GITHUB_TOKEN }} - name: Set up Python uses: actions/setup-python@v4 with: python-version: '3.10' - name: Install dependencies run: | python -m pip install --upgrade pip pip install requests PyGithub - name: Run bookmark saver env: READWISE_TOKEN: ${{ secrets.READWISE_TOKEN }} GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} GITHUB_REPOSITORY: ${{ github.repository }} run: python save_to_readwise.py
Teable
在 bookmark-collection 仓库增加两个 secret: TEABLE_TABLE_ID
, TEABLE_TOKEN
。
TEABLE_TABLE_ID
就是要写入表格的 ID,在地址栏的 &tableId=
后面就是 ID 了。 TEABLE_TOKEN
就是 API 。
Python
在 bookmark-summary 里修改 py 脚本:
import re from typing import List, Optional import requests import json from datetime import datetime from pathlib import Path from dataclasses import dataclass, asdict import os import logging import time from functools import wraps from urllib.parse import quote import http.client # -- configurations begin -- BOOKMARK_COLLECTION_REPO_NAME: str = "bookmark-collection" BOOKMARK_SUMMARY_REPO_NAME: str = "bookmark-summary" TEABLE_TABLE_ID: str = os.environ.get('TEABLE_TABLE_ID') TEABLE_TOKEN: str = os.environ.get('TEABLE_TOKEN') # -- configurations end -- logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(filename)s:%(lineno)d - %(funcName)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) def log_execution_time(func): @wraps(func) def wrapper(*args, **kwargs): logging.info(f'Entering {func.__name__}') start_time = time.time() result = func(*args, **kwargs) end_time = time.time() elapsed_time = end_time - start_time logging.info(f'Exiting {func.__name__} - Elapsed time: {elapsed_time:.4f} seconds') return result return wrapper @dataclass class SummarizedBookmark: year: str month: str # yyyyMM title: str url: str timestamp: int # unix timestamp summary: str CURRENT_YEAR: str = datetime.now().strftime('%Y') CURRENT_MONTH: str = datetime.now().strftime('%m') CURRENT_DATE: str = datetime.now().strftime('%Y-%m-%d') CURRENT_DATE_AND_TIME: str = datetime.now().strftime('%Y-%m-%d %H:%M:%S') @log_execution_time def get_text_content(url: str) -> str: jina_url: str = f"https://r.jina.ai/{url}" response: requests.Response = requests.get(jina_url) return response.text @log_execution_time def call_openai_api(prompt: str, content: str) -> str: model: str = os.environ.get('OPENAI_API_MODEL', 'gpt-4o-mini') headers: dict = { "Authorization": f"Bearer {os.environ['OPENAI_API_KEY']}", "Content-Type": "application/json" } data: dict = { "model": model, "messages": [ {"role": "system", "content": prompt}, {"role": "user", "content": content} ] } api_endpoint: str = os.environ.get('OPENAI_API_ENDPOINT', 'https://api.openai.com/v1/chat/completions') response: requests.Response = requests.post(api_endpoint, headers=headers, data=json.dumps(data)) return response.json()['choices'][0]['message']['content'] ''' def clean_prompt(prompt: str) -> str: """清理和验证prompt格式""" # 移除多余的空白字符 prompt = prompt.strip() # 确保XML声明在第一行 if not prompt.startswith('\n' + prompt # 验证XML格式 try: from xml.etree import ElementTree ElementTree.fromstring(prompt) except ElementTree.ParseError as e: logging.warning(f"Prompt XML format warning: {e}") return prompt ''' @log_execution_time def summarize_text(text: str) -> str: prompt: str = """ {#- 用简体中文中文進行文章摘要 -#} ## Profile: - author: Vandee - role: 文章内容深度总结思考助手 - language: 中文 - description: 全面的总结文章的主要观点,并结合严谨的逻辑思维分析文章要点,剖析文章内容。 ## Goals: - 第一步,仔细阅读文章内容。 - 第二步,对每个段落进行总结,总结文章的主要内容,理清楚作者表达了什么观点、作者解决了那些具体的问题。 - 第三步,文章要点总结。根据原文内容,提炼出文章的5个以内的主要观点或作者解决的问题。 - 第四步,根据上面三步,按照指定的输出格式,整理出文章内容的总结。 ## Constrains: - 文章内容总结的{摘要}字数控制在380个中文汉字以内。 - 尽可能还原文章中的专业词汇,并对其进行通俗解释。 - 在总结的过程中,完全按照文章作者的表达内容进行整理,不添加你的额外观点。 - 所有输出用中文生成。 - 文章内容里的"我“是文章的原作者,不要代入 Vandee 的身份。 ## Skills: - 善于用流畅通顺的简体中文总结内容重点。 - 具有良好的逻辑思维能力,能够深入分析文章内容。 - 掌握文章相关领域的专业知识,能够准确理解和阐述专业概念。 - 擅长以通俗易懂的方式解释复杂的专业内容。 ## Workflows: - 逐段阅读文章内容。 - 总结文章的内容并生成{摘要}。这一步你需要全面理解文章内容的主题、内容的逻辑框架、作者的提出的观点,摘要不少于270个中文汉字。 - 再次回顾原文所有内容,在上一步总结出{摘要}的基础上,进行深入分析。这一步你需要理清这些内容之间的逻辑关系、专业概念、名词概念,并着重关注原文内容里多次出现的词汇或概念,特别关注作者提出了什么观点、作者解决了那些具体的问题、作者体悟出了哪些道理、作者得出了什么重大的研究结论,最后梳理出{精炼内容}。 - 根据原文内容和你上一步的{精炼内容},提炼出文章的至少4个要点生成{要点总结},你不用输出{精炼内容}。 - 你需要按照markdown有序列表的格式列出上一步{要点总结}中的要点,并根据要点所在的原文并严格根据文章内容扩展对该要点的解析,方便读者理解这些要点的意思。 - 按照指定的输出格式,整理出文章内容的总结。“摘要“和”要点总结“只需要按照markdown格式加粗,不要用标题格式。 ## OutputFormat: **摘要**: {摘要} **要点总结**: {要点总结} """ result = call_openai_api(prompt, text) # 先调用 API 并存储结果 time.sleep(1) # 等待 1 秒 return result # 返回结果 @log_execution_time def one_sentence_summary(text: str) -> str: prompt: str = "以下是对一篇长文的列表形式总结。请基于此输出对该文章的简短总结,长度不超过100个字。总是使用简体中文输出。" return call_openai_api(prompt, text) def slugify(text: str) -> str: invalid_fs_chars: str = '/\\:*?"<>|' return re.sub(r'[' + re.escape(invalid_fs_chars) + r'\s]+', '-', text.lower()).strip('-') def get_summary_file_path(title: str, timestamp: int, year: Optional[str] = None, month: Optional[str] = None, in_readme_md: bool = False) -> Path: date_str = datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d') summary_filename: str = f"{date_str}-{slugify(title)}.md" if year is None: year = CURRENT_YEAR if month is None: month = CURRENT_MONTH if in_readme_md: root: Path = Path(year, month) # 更新路径为 year/month else: root: Path = Path(BOOKMARK_SUMMARY_REPO_NAME, year, month) # 更新路径为 year/month return Path(root, summary_filename) def get_text_content_path(title: str, in_summary_md: bool = False) -> Path: text_content_filename: str = f"{CURRENT_DATE}-{slugify(title)}_raw.md" root: Path = Path(BOOKMARK_SUMMARY_REPO_NAME, CURRENT_YEAR, CURRENT_MONTH) # 更新路径为 YEAR/MONTH if in_summary_md: root = Path(".") return Path(root, text_content_filename) def build_summary_file(title: str, url: str, summary: str, one_sentence: str) -> str: """构建总结文件的内容。""" return f"""# {title} - URL: {url} - Added At: {CURRENT_DATE_AND_TIME} - [Link To Text]({get_text_content_path(title, in_summary_md=True)}) ## Summary {summary} """ def build_index_md(title: str, url: str, summary: str, one_sentence: str, text_content: str) -> str: """构建 index.md 文件内容,添加 YAML 头部并包含全文内容。""" # 处理标题中的冒号 yaml_safe_title = title.replace(':', '-') return f"""--- title: {yaml_safe_title} date: {CURRENT_DATE} extra: source: {url} original_title: {title} --- ## Summary {summary} ## Full Content {text_content} """ def build_summary_readme_md(summarized_bookmarks: List[SummarizedBookmark]) -> str: initial_prefix: str = """# Clip 总会有一些没达到我想收录到PKM体系里标准的文章,但又弃之可惜。介于这两者之间的,就放在这个clip里了。区别于笔记,这里主要是原文的 Markdown。 Inspired by :[Owen's Clip](https://github.com/theowenyoung/clip) , [LLM x 书签收藏:摘要 & 全文索引 - Nekonull's Garden](https://nekonull.me/posts/llm_x_bookmark/) ## Summarized Bookmarks """ summary_list: str = "" sorted_summarized_bookmarks = sorted(summarized_bookmarks, key=lambda bookmark: bookmark.timestamp, reverse=True) for bookmark in sorted_summarized_bookmarks: summary_file_path = get_summary_file_path( title=bookmark.title, timestamp=bookmark.timestamp, month=bookmark.month, in_readme_md=True ) summary_list += f"- ({datetime.fromtimestamp(bookmark.timestamp).strftime('%Y-%m-%d')}) [{bookmark.title}]({summary_file_path})\n" return initial_prefix + summary_list @log_execution_time def post_to_teable(title: str, url: str, one_sentence: str) -> None: """ Post a bookmark record to Teable """ try: conn = http.client.HTTPSConnection("app.teable.io") payload = { "typecast": True, "records": [{ "fields": { "Title": title, "Source": url, "Summary": one_sentence, } }] } headers = { 'Authorization': f"Bearer {TEABLE_TOKEN}", 'Content-Type': "application/json" } conn.request( "POST", f"/api/table/{TEABLE_TABLE_ID}/record", json.dumps(payload), headers ) response = conn.getresponse() if response.status not in (200, 201): logging.error(f"Failed to post to Teable. Status: {response.status}, Response: {response.read().decode()}") else: logging.info("Successfully posted to Teable") except Exception as e: logging.error(f"Error posting to Teable: {str(e)}") finally: conn.close() @log_execution_time def process_bookmark_file(): # 读取书签和已总结书签 with open(f'{BOOKMARK_COLLECTION_REPO_NAME}/README.md', 'r', encoding='utf-8') as f: bookmark_lines = f.readlines() with open(f'{BOOKMARK_SUMMARY_REPO_NAME}/data.json', 'r', encoding='utf-8') as f: summarized_bookmark_dicts = json.load(f) summarized_bookmarks = [SummarizedBookmark(**bookmark) for bookmark in summarized_bookmark_dicts] summarized_urls = {bookmark.url for bookmark in summarized_bookmarks} # 找到未总结的书签 title, url = None, None for line in bookmark_lines: match = re.search(r'- \[(.*?)\]\((.*?)\)', line) if match and match.group(2) not in summarized_urls: title, url = match.groups() break # 如果没有找到新的书签,则退出 if not title or not url: logging.info("No new bookmarks to summarize.") return # 将标题格式化为文件名 title_slug = slugify(title) # 创建 YEAR/MONTH/ 目录 monthly_path = Path(f'{BOOKMARK_SUMMARY_REPO_NAME}/{CURRENT_YEAR}/{CURRENT_MONTH}') monthly_path.mkdir(parents=True, exist_ok=True) # 创建 content/YEAR/MONTH/TITLE/ 目录 content_path = Path(f'{BOOKMARK_SUMMARY_REPO_NAME}/content/{CURRENT_YEAR}/{CURRENT_MONTH}/{title_slug}') content_path.mkdir(parents=True, exist_ok=True) # 获取和总结内容 text_content = get_text_content(url) summary = summarize_text(text_content) one_sentence = one_sentence_summary(summary) timestamp = int(datetime.now().timestamp()) # 使用当前日期创建前缀 date_prefix = datetime.now().strftime('%Y-%m-%d-') # 保存原始内容到 YEAR/MONTH/yyyy-MM-dd-title_raw.md with open(monthly_path / f"{date_prefix}{title_slug}_raw.md", 'w', encoding='utf-8') as f: f.write(text_content) # 保存总结内容到 YEAR/MONTH/yyyy-MM-dd-title.md summary_content = build_summary_file(title, url, summary, one_sentence) with open(monthly_path / f"{date_prefix}{title_slug}.md", 'w', encoding='utf-8') as f: f.write(summary_content) # 保存 index.md 到 content/YEAR/MONTH/TITLE/index.md index_content = build_index_md(title, url, summary, one_sentence, text_content) with open(content_path / "index.md", 'w', encoding='utf-8') as f: f.write(index_content) # 更新已总结的书签数据 summarized_bookmarks.append(SummarizedBookmark( title=title, url=url, summary=one_sentence, year=CURRENT_YEAR, month=CURRENT_MONTH, timestamp=timestamp )) # 更新 README 和 data.json with open(f'{BOOKMARK_SUMMARY_REPO_NAME}/Bookmarks_List.md', 'w', encoding='utf-8') as f: f.write(build_summary_readme_md(summarized_bookmarks)) with open(f'{BOOKMARK_SUMMARY_REPO_NAME}/data.json', 'w', encoding='utf-8') as f: json.dump([asdict(bookmark) for bookmark in summarized_bookmarks], f, indent=2, ensure_ascii=False) # Post to Teable if TEABLE_TOKEN and TEABLE_TABLE_ID: post_to_teable(title, url, one_sentence) else: logging.warning("Teable API token or table ID not set, skipping Teable update") def main(): process_bookmark_file() if __name__ == "__main__": main()
workflow
修改 bookmark-collection 仓库的 yaml:
name: Bookmark Summary on: workflow_dispatch: # 只保留手动触发和被其他 workflow 触发的情况 concurrency: group: mygroup cancel-in-progress: false jobs: summarize: runs-on: ubuntu-latest steps: - name: Checkout bookmark-collection uses: actions/checkout@v2 with: path: bookmark-collection - name: Checkout bookmark-summary uses: actions/checkout@v2 with: repository: VandeeFeng/bookmark-summary path: bookmark-summary token: ${{ secrets.PAT }} - name: Set up Python uses: actions/setup-python@v2 with: python-version: '3.x' - name: Install dependencies with retry uses: nick-fields/retry@v3 # 使用 retry 包装安装依赖步骤 with: timeout_minutes: 2 # 每次尝试的超时时间(2分钟) max_attempts: 3 # 最大重试次数 command: | python -m pip install --upgrade pip pip install requests waybackpy - name: Process changes with retry uses: nick-fields/retry@v3 # 包装变更处理步骤 with: timeout_minutes: 2 # 每次尝试的超时时间(5分钟) max_attempts: 3 # 最大重试次数 command: | OPENAI_API_KEY=${{ secrets.OPENAI_API_KEY }} \ OPENAI_API_MODEL=${{ secrets.OPENAI_API_MODEL }} \ OPENAI_API_ENDPOINT=${{ secrets.OPENAI_API_ENDPOINT }} \ TEABLE_TABLE_ID=${{ secrets.TEABLE_TABLE_ID }} \ TEABLE_TOKEN=${{ secrets.TEABLE_TOKEN }} \ python bookmark-summary/process_changes.py - name: Commit changes to bookmark-summary run: | cd bookmark-summary git config --local user.email "action@github.com" git config --local user.name "GitHub Action" git add . git commit -m "Add new summaries" || echo "No changes to commit" git push
bash
顺便把 websites 也用 bash 脚本保存在 teable 里了,之前一直用 org-capture,现在太多了不好检索。让 Claude 把之前的列表转换为了 csv 导入到了 teable。
#!/bin/bash while true; do # 提示用户输入信息 read -p "Enter Name (or type 'q' to quit): " name if [[ "$name" == "q" ]]; then echo "Exiting..." break fi read -p "Enter Intro (or type 'q' to quit): " intro if [[ "$intro" == "q" ]]; then echo "Exiting..." break fi read -p "Enter Source (or type 'q' to quit): " source if [[ "$source" == "q" ]]; then echo "Exiting..." break fi # 发送 POST 请求 curl --request POST \ --url https://app.teable.io/api/table/TEABLE_ID/record \ --header 'Authorization: Bearer TEABLE_TOKEN' \ --header 'content-type: application/json' \ --data "$(cat <
PocketBase
需要在仓库的 secret 里增加 POCKETBASE_TOKEN
、 POCKETBASE_API
两个secert,填入自己的 pocketbase API 地址和请求头。
这个请求头我也是看了好半天文档才弄明白:
需要在指定的 collection 的 API Rules 里,手动加上 @request.headers.x_token = "Your_token"
,这里填入的内容就是 POCKETBASE_TOKEN
, POCKETBASE_API
在每个 collection 里会显示。
由于这里指定了 header,在 python 的部分就得特别处理:
second_response = requests.post( url=os.environ['POCKETBASE_API'], headers={ "x_token": f"{os.environ['POCKETBASE_TOKEN']}", "Content-Type": "application/json" }, json={ "URL": url, "title": title } )
docker 部署到 VPS
version: "3.7" services: pocketbase: image: ghcr.io/muchobien/pocketbase:latest container_name: pocketbase restart: unless-stopped ports: - "8090:8090" volumes: - "./data:/pb_data" healthcheck: #optional (recommended) since v0.10.0 test: wget --no-verbose --tries=1 --spider http://localhost:8090/api/health || exit 1 interval: 5s timeout: 5s retries: 5
Python
修改 bookmark-collection 仓库的 py 脚本:
import os import re import logging import requests from github import Github logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S' ) def get_added_content(): """获取这次提交新增的内容""" try: g = Github(os.environ['GITHUB_TOKEN']) repo = g.get_repo(os.environ['GITHUB_REPOSITORY']) commit_sha = os.environ.get('GITHUB_SHA') commit = repo.get_commit(commit_sha) for file in commit.files: if file.filename.endswith('.md'): if file.patch and '+' in file.patch: added_lines = [line[1:] for line in file.patch.split('\n') if line.startswith('+') and not line.startswith('+++')] return added_lines return [] except Exception as e: logging.error(f"Error getting commit content: {str(e)}") raise def extract_url_and_title(line: str): """从行中提取URL和标题,支持markdown格式的链接 [title](url)""" # 更新后的提取模式,同时获取标题和URL pattern = r'\[(.*?)\]\((https?://[\w\-._~:/?#\[\]@!$&\'()*+,;=.%]+)\)' match = re.search(pattern, line) if match: title = match.group(1) url = match.group(2) return url, title return None, None def has_clip_tag(line: str) -> bool: """检查是否包含 #2clip 标签""" return bool(re.search(r'#2clip\b', line)) def trigger_workflow(): """触发另一个workflow""" try: g = Github(os.environ['GITHUB_TOKEN']) repo = g.get_repo(os.environ['GITHUB_REPOSITORY']) workflow = repo.get_workflow("bookmark_summary.yml") workflow.create_dispatch("main") logging.info("Successfully triggered the bookmark_summary workflow") except Exception as e: logging.error(f"Failed to trigger workflow: {str(e)}") raise def main(): try: # 获取所有新增的内容 added_lines = get_added_content() if not added_lines: logging.info("No new markdown content found in this commit") return trigger_needed = False # 处理每一行新增的内容 for line in added_lines: line = line.strip() logging.info(f"Processing line: {line}") # 检查标签 if has_clip_tag(line): trigger_needed = True logging.info("Found #2clip tag") # 提取并处理URL和标题(无论是否有标签) url, title = extract_url_and_title(line) if url: try: # 发送到 Readwise response = requests.post( url="https://readwise.io/api/v3/save/", headers={"Authorization": f"Token {os.environ['READWISE_TOKEN']}"}, json={ "url": url, "tags": ["Bookmark"] } ) response.raise_for_status() logging.info(f"Successfully saved URL to Readwise: {url}") # 发送到第二个 API endpoint second_response = requests.post( url=os.environ['POCKETBASE_API'], headers={ "x_token": f"{os.environ['POCKETBASE_TOKEN']}", "Content-Type": "application/json" }, json={ "URL": url, "title": title } ) second_response.raise_for_status() logging.info(f"Successfully saved URL to pocketbase: {url}") except requests.exceptions.RequestException as e: logging.error(f"Failed to save URL {url}: {str(e)}") # 如果发现了标签,触发workflow if trigger_needed: logging.info("Triggering workflow due to #2clip tag") trigger_workflow() except Exception as e: logging.error(f"Error: {str(e)}") raise if __name__ == "__main__": main()
workflow
修改 bookmark-collection 仓库的 yaml:
name: Save Bookmark to Readwise on: push: branches: - main paths: - '**.md' workflow_dispatch: permissions: contents: read actions: write jobs: save-to-readwise: runs-on: ubuntu-latest steps: - name: Checkout repository uses: actions/checkout@v4 with: token: ${{ secrets.GITHUB_TOKEN }} - name: Set up Python uses: actions/setup-python@v4 with: python-version: '3.10' - name: Install dependencies run: | python -m pip install --upgrade pip pip install requests PyGithub - name: Run bookmark saver env: READWISE_TOKEN: ${{ secrets.READWISE_TOKEN }} GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} GITHUB_REPOSITORY: ${{ github.repository }} POCKETBASE_API: ${{ secrets.POCKETBASE_API }} POCKETBASE_TOKEN: ${{ secrets.POCKETBASE_TOKEN }} run: python save_to_readwise.py
Readwise highlights
写了一个 class ReadwiseAPI
方便其他项目引入。可以定时获取我所有 highlights 的 title 和 url。
后面可以直接把 highlights 导入到 pocketbase。
python
import requests import json from datetime import datetime, timedelta import os from typing import List, Dict, Optional from pathlib import Path import re from github import Github import argparse class ReadwiseAPI: """Readwise API client for exporting highlights with smart update capability and GitHub integration""" def __init__(self): # Initialize Readwise token self.readwise_token = os.environ.get("READWISE_TOKEN") if not self.readwise_token: raise ValueError("READWISE_TOKEN not found in environment variables") # Initialize GitHub token self.github_token = os.environ.get("GITHUB_TOKEN") if not self.github_token: raise ValueError("GITHUB_TOKEN not found in environment variables") # Get repository from GitHub Actions environment variable self.github_repo = os.environ.get("GITHUB_REPOSITORY") if not self.github_repo: raise ValueError("Not running in GitHub Actions environment (GITHUB_REPOSITORY not found)") # Initialize GitHub client self.github = Github(self.github_token) self.repo = self.github.get_repo(self.github_repo) # Initialize Readwise API settings self.base_url = "https://readwise.io/api/v2" self.headers = { "Authorization": f"Token {self.readwise_token}" } self.last_update_file = "last_update.json" self.articles_file = "articles.json" def get_highlights(self, updated_after: Optional[datetime] = None, start_date: Optional[datetime] = None, end_date: Optional[datetime] = None) -> Dict: """Get all highlights with their associated metadata""" endpoint = f"{self.base_url}/export/" params = {} if updated_after: params["updated_after"] = updated_after.isoformat() elif start_date: params["updated_after"] = start_date.isoformat() if end_date: params["updated_before"] = end_date.isoformat() print(f"Fetching highlights with params: {params}") response = requests.get(endpoint, headers=self.headers, params=params) response.raise_for_status() return response.json() def get_file_content(self, path: str) -> Optional[str]: """Get file content from GitHub repository""" try: content = self.repo.get_contents(path) return content.decoded_content.decode('utf-8') except Exception as e: print(f"File {path} not found in repository: {e}") return None def update_file(self, path: str, content: str, message: str): """Update or create file in GitHub repository""" try: # Try to get existing file file = self.repo.get_contents(path) # Update existing file self.repo.update_file( path=path, message=message, content=content, sha=file.sha ) except Exception: # Create new file if it doesn't exist self.repo.create_file( path=path, message=message, content=content ) def clean_title(self, title: str) -> str: """Clean title by removing newlines and extra spaces""" title = re.sub(r'\s+', ' ', title.replace('\n', ' ')) return title.strip() def create_article_json(self, highlights_data: Dict) -> List[Dict]: """Create a list of articles with title and URL, only for category 'articles'""" articles = [] for article in highlights_data.get('results', []): if article.get('category', '').lower() == 'articles': title = self.clean_title(article.get('title', 'Untitled')) url = article.get('source_url', '') articles.append({ 'title': title, 'url': url }) return articles def load_last_update_from_github(self) -> Optional[datetime]: """Load the last update date from GitHub""" content = self.get_file_content(self.last_update_file) if content: try: data = json.loads(content) return datetime.strptime(data['last_update'], '%Y-%m-%d') except Exception as e: print(f"Error parsing last update file: {e}") return None return None def save_last_update_to_github(self): """Save current date as last update date to GitHub""" current_date = datetime.now().strftime('%Y-%m-%d') content = json.dumps({'last_update': current_date}) self.update_file( path=self.last_update_file, content=content, message="Update last sync date" ) def load_existing_articles_from_github(self) -> List[Dict]: """Load existing articles from GitHub""" content = self.get_file_content(self.articles_file) if content: try: return json.loads(content) except Exception as e: print(f"Error parsing articles file: {e}") return [] return [] def merge_articles(self, existing_articles: List[Dict], new_articles: List[Dict]) -> List[Dict]: """Merge new articles with existing ones, avoiding duplicates""" existing_set = {(article['title'], article['url']) for article in existing_articles} for article in new_articles: article_tuple = (article['title'], article['url']) if article_tuple not in existing_set: existing_articles.append(article) existing_set.add(article_tuple) return existing_articles def export_articles(self, start_date: Optional[str] = None, end_date: Optional[str] = None, all_time: bool = False): """ Export articles to GitHub with smart update capability Args: start_date: Optional start date in YYYY-MM-DD format end_date: Optional end date in YYYY-MM-DD format all_time: If True, fetch all highlights regardless of dates """ if all_time: # 当选择 all_time 时,强制获取所有 highlights,忽略上次更新时间 print("Fetching all highlights from the beginning") highlights_data = self.get_highlights() elif start_date: # 如果指定了开始日期,使用指定的日期范围 start_datetime = datetime.strptime(start_date, '%Y-%m-%d') end_datetime = datetime.strptime(end_date, '%Y-%m-%d') if end_date else datetime.now() print(f"Fetching highlights from {start_date} to {end_date or 'now'}") highlights_data = self.get_highlights(start_date=start_datetime, end_date=end_datetime) else: # 使用上次更新时间的增量更新逻辑 last_update = self.load_last_update_from_github() if last_update: days_since_update = (datetime.now() - last_update).days print(f"Last update was {days_since_update} days ago on {last_update.strftime('%Y-%m-%d')}") if days_since_update > 0: print(f"Fetching highlights updated after {last_update.strftime('%Y-%m-%d')}") highlights_data = self.get_highlights(updated_after=last_update) else: print("Already updated today, no need to fetch new articles") return else: print("No previous update found, fetching all articles") highlights_data = self.get_highlights() # Create article data new_articles = self.create_article_json(highlights_data) print(f"Found {len(new_articles)} new articles") # Load existing articles existing_articles = self.load_existing_articles_from_github() print(f"Found {len(existing_articles)} existing articles") # Merge new articles with existing ones merged_articles = self.merge_articles(existing_articles, new_articles) print(f"Total unique articles after merge: {len(merged_articles)}") # Save merged articles to GitHub self.update_file( path=self.articles_file, content=json.dumps(merged_articles, ensure_ascii=False, indent=2), message="Update articles list" ) # Update the last update date if not start_date and not all_time: # 只有在非指定日期范围和非全量更新的情况下才更新最后同步时间 self.save_last_update_to_github() print(f"Successfully updated articles in GitHub repository") if new_articles: print("New articles added:") for article in new_articles: print(f"- {article['title']}") def main(): # 从环境变量获取 GitHub Actions 的输入参数 gh_start_date = os.environ.get('INPUT_START_DATE', '') gh_end_date = os.environ.get('INPUT_END_DATE', '') gh_all_time = os.environ.get('INPUT_ALL_TIME', '').lower() == 'true' # 设置命令行参数解析器 parser = argparse.ArgumentParser(description='Sync Readwise highlights to GitHub') parser.add_argument('--start-date', type=str, help='Start date in YYYY-MM-DD format') parser.add_argument('--end-date', type=str, help='End date in YYYY-MM-DD format') parser.add_argument('--all-time', action='store_true', help='Fetch all highlights from the beginning') args = parser.parse_args() # 优先使用命令行参数,如果没有则使用 GitHub Actions 的输入参数 start_date = args.start_date or gh_start_date end_date = args.end_date or gh_end_date all_time = args.all_time or gh_all_time try: client = ReadwiseAPI() client.export_articles( start_date=start_date if start_date else None, end_date=end_date if end_date else None, all_time=all_time ) except Exception as e: print(f"An error occurred: {str(e)}") raise if __name__ == "__main__": main()
workflow
name: Sync Readwise Articles on: schedule: # 每天凌晨 1 点运行 (UTC 时间,对应北京时间 9 点) - cron: '0 1 * * *' # 支持手动触发,并添加输入参数 workflow_dispatch: inputs: start_date: description: 'Start date (YYYY-MM-DD, e.g., 2024-01-01)' required: false type: string default: '' end_date: description: 'End date (YYYY-MM-DD, leave empty for current date)' required: false type: string default: '' all_time: description: 'Fetch all highlights (overrides date range if selected)' type: boolean required: false default: false permissions: contents: write # 仓库内容的读写权限 jobs: sync: runs-on: ubuntu-latest steps: - name: Checkout repository uses: actions/checkout@v4 - name: Set up Python uses: actions/setup-python@v5 with: python-version: '3.10' cache: 'pip' cache-dependency-path: '**/requirements.txt' - name: Install dependencies run: | python -m pip install --upgrade pip pip install -r requirements.txt - name: Run sync script env: READWISE_TOKEN: ${{ secrets.READWISE_TOKEN }} GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} INPUT_START_DATE: ${{ github.event.inputs.start_date }} INPUT_END_DATE: ${{ github.event.inputs.end_date }} INPUT_ALL_TIME: ${{ github.event.inputs.all_time }} run: python readwise_sync.py - name: Check for changes id: verify-changed-files run: | if [ -n "$(git status --porcelain)" ]; then echo "changes_found=true" >> $GITHUB_OUTPUT else echo "changes_found=false" >> $GITHUB_OUTPUT fi - name: Commit changes if: steps.verify-changed-files.outputs.changes_found == 'true' run: | git config --local user.email "github-actions[bot]@users.noreply.github.com" git config --local user.name "github-actions[bot]" git add articles.json last_update.json git commit -m "Update Readwise articles [skip ci]" || echo "No changes to commit" - name: Push changes if: steps.verify-changed-files.outputs.changes_found == 'true' uses: ad-m/github-push-action@master with: github_token: ${{ secrets.GITHUB_TOKEN }} branch: ${{ github.ref }}