Vandee's Blog

10 Nov 2024

数据库的搭建 - 流动知识检索

Readwise 是我体验的最舒服的 after-reading 软件了,之前一直没有利用好它的 API,在 PKM 体系里,也一直没有统一到工作流。而这些流动知识的数据越来越多,以 10 年 20 年来度量,必须用数据库来完善和管理了,也是碰巧看到了这个项目:pocketbase/pocketbase: Open Source realtime backend in 1 file ,很简洁的管理 UI,支持 S3 数据备份,支持 API 导入和导出。还有一个更轻量的选择 teableio/teable: ✨ The Next Gen Airtable Alternative: No-Code Postgres ,用了半年了,之前一直没好好看它们的 API,只是不能直接用数据库的 API 导入数据,但是是可以直接用 API 添加 Record 到表格的。

pocketbase 这个项目的优势是更适合长文内容的保存,内置编辑器,但是不支持直接导入表格或 csv 文件,但可以导入 JSON 和 S3 自动备份。

teable 则更偏向于可视化表格,不太适合长文内容的保存。由于我的长文都用 clip 保存了,clip 面向阅读体验,而teable 用来快速的查询,面向数据。现在只保存了三个 field:title,url,和一句话内容总结,方便我时间长了之后回忆和查找相关文章。

在 Claude 的协助下,有了现在的方案。

主要思路:利用 GitHub 为中转站, osmos::memos 插件保存文章,同步保存到 Readwise 进行阅读和高亮,再同步保存到 pocketbase 或 teable 做数据库处理。现在暂时选择了 teable 的方案。

同步保存到 Readwise

现在使用 osmos::memos 插件保存的时候,会默认也保存到 readwise。添加 #2clip 标签触发 clip 的 workflow。详见: 用 GitHub 仓库做书签和 AI 摘要 - 流动知识检索

这样就统一起来了。所有的链接都保存在 bookmark-collection 仓库作为中转站,然后统一在 Readwise 里阅读,导出 highlights。有保存价值的会触发 clip workflow 保存原文。

主要思路:在 push 的时候会运行这个 Python 脚本,读取新增内容,获取 URL 并使用 Readwise Reader 的 API 保存到 Readwise Reader。Readwise 和 Readwise Reader 是两个 API。

需要在仓库的 secret 里增加一个 READWISE_TOKEN secert,填入自己的 Readwise API。

python

import os
import re
import logging
import requests
from github import Github
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)
def get_added_content():
    """获取这次提交新增的内容"""
    try:
        g = Github(os.environ['GITHUB_TOKEN'])
        repo = g.get_repo(os.environ['GITHUB_REPOSITORY'])
        commit_sha = os.environ.get('GITHUB_SHA')
        commit = repo.get_commit(commit_sha)
        for file in commit.files:
            if file.filename.endswith('.md'):
                if file.patch and '+' in file.patch:
                    added_lines = [line[1:] for line in file.patch.split('\n')
                                 if line.startswith('+') and not line.startswith('+++')]
                    return added_lines
        return []
    except Exception as e:
        logging.error(f"Error getting commit content: {str(e)}")
        raise
def extract_url(line: str):
    """从行中提取URL,支持markdown格式的链接"""
    # 更新后的URL提取模式
    url_pattern = r'\((https?://[\w\-._~:/?#\[\]@!$&\'()*+,;=.%]+)\)'
    match = re.search(url_pattern, line)
    if match:
        return match.group(1)
    return None
def has_clip_tag(line: str) -> bool:
    """检查是否包含 #2clip 标签"""
    return bool(re.search(r'#2clip\b', line))
def trigger_workflow():
    """触发另一个workflow"""
    try:
        g = Github(os.environ['GITHUB_TOKEN'])
        repo = g.get_repo(os.environ['GITHUB_REPOSITORY'])
        workflow = repo.get_workflow("bookmark_summary.yml")
        workflow.create_dispatch("main")
        logging.info("Successfully triggered the bookmark_summary workflow")
    except Exception as e:
        logging.error(f"Failed to trigger workflow: {str(e)}")
        raise
def main():
    try:
        # 获取所有新增的内容
        added_lines = get_added_content()
        if not added_lines:
            logging.info("No new markdown content found in this commit")
            return
        trigger_needed = False
        # 处理每一行新增的内容
        for line in added_lines:
            line = line.strip()
            logging.info(f"Processing line: {line}")
            # 检查标签
            if has_clip_tag(line):
                trigger_needed = True
                logging.info("Found #2clip tag")
            # 提取并处理URL(无论是否有标签)
            url = extract_url(line)
            if url:
                try:
                    response = requests.post(
                        url="https://readwise.io/api/v3/save/",
                        headers={"Authorization": f"Token {os.environ['READWISE_TOKEN']}"},
                        json={
                            "url": url,
                            "tags": ["Bookmark"]
                        }
                    )
                    response.raise_for_status()
                    logging.info(f"Successfully saved URL: {url}")
                except requests.exceptions.RequestException as e:
                    logging.error(f"Failed to save URL {url}: {str(e)}")
        # 如果发现了标签,触发workflow
        if trigger_needed:
            logging.info("Triggering workflow due to #2clip tag")
            trigger_workflow()
    except Exception as e:
        logging.error(f"Error: {str(e)}")
        raise
if __name__ == "__main__":
    main()

workflow

name: Save Bookmark to Readwise
on:
  push:
    branches:
      - main
    paths:
      - '**.md'
  workflow_dispatch:
permissions:
  contents: read
  actions: write
jobs:
  save-to-readwise:
    runs-on: ubuntu-latest
    steps:
    - name: Checkout repository
      uses: actions/checkout@v4
      with:
        token: ${{ secrets.GITHUB_TOKEN }}
    - name: Set up Python
      uses: actions/setup-python@v4
      with:
        python-version: '3.10'
    - name: Install dependencies
      run: |
        python -m pip install --upgrade pip
        pip install requests PyGithub
    - name: Run bookmark saver
      env:
        READWISE_TOKEN: ${{ secrets.READWISE_TOKEN }}
        GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
        GITHUB_REPOSITORY: ${{ github.repository }}
      run: python save_to_readwise.py

Teable

在 bookmark-collection 仓库增加两个 secret: TEABLE_TABLE_IDTEABLE_TOKEN

TEABLE_TABLE_ID 就是要写入表格的 ID,在地址栏的 &tableId= 后面就是 ID 了。 TEABLE_TOKEN 就是 API 。

Python

在 bookmark-summary 里修改 py 脚本:

import re
from typing import List, Optional
import requests
import json
from datetime import datetime
from pathlib import Path
from dataclasses import dataclass, asdict
import os
import logging
import time
from functools import wraps
from urllib.parse import quote
import http.client
# -- configurations begin --
BOOKMARK_COLLECTION_REPO_NAME: str = "bookmark-collection"
BOOKMARK_SUMMARY_REPO_NAME: str = "bookmark-summary"
TEABLE_TABLE_ID: str = os.environ.get('TEABLE_TABLE_ID')
TEABLE_TOKEN: str = os.environ.get('TEABLE_TOKEN')
# -- configurations end --
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(filename)s:%(lineno)d - %(funcName)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)
def log_execution_time(func):
    @wraps(func)
    def wrapper(*args, **kwargs):
        logging.info(f'Entering {func.__name__}')
        start_time = time.time()
        result = func(*args, **kwargs)
        end_time = time.time()
        elapsed_time = end_time - start_time
        logging.info(f'Exiting {func.__name__} - Elapsed time: {elapsed_time:.4f} seconds')
        return result
    return wrapper
@dataclass
class SummarizedBookmark:
    year: str
    month: str  # yyyyMM
    title: str
    url: str
    timestamp: int  # unix timestamp
    summary: str
CURRENT_YEAR: str = datetime.now().strftime('%Y')
CURRENT_MONTH: str = datetime.now().strftime('%m')
CURRENT_DATE: str = datetime.now().strftime('%Y-%m-%d')
CURRENT_DATE_AND_TIME: str = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
@log_execution_time
def get_text_content(url: str) -> str:
    jina_url: str = f"https://r.jina.ai/{url}"
    response: requests.Response = requests.get(jina_url)
    return response.text
@log_execution_time
def call_openai_api(prompt: str, content: str) -> str:
    model: str = os.environ.get('OPENAI_API_MODEL', 'gpt-4o-mini')
    headers: dict = {
        "Authorization": f"Bearer {os.environ['OPENAI_API_KEY']}",
        "Content-Type": "application/json"
    }
    data: dict = {
        "model": model,
        "messages": [
            {"role": "system", "content": prompt},
            {"role": "user", "content": content}
        ]
    }
    api_endpoint: str = os.environ.get('OPENAI_API_ENDPOINT', 'https://api.openai.com/v1/chat/completions')
    response: requests.Response = requests.post(api_endpoint, headers=headers, data=json.dumps(data))
    return response.json()['choices'][0]['message']['content']
'''
def clean_prompt(prompt: str) -> str:
    """清理和验证prompt格式"""
    # 移除多余的空白字符
    prompt = prompt.strip()
    # 确保XML声明在第一行
    if not prompt.startswith('\n' + prompt
    # 验证XML格式
    try:
        from xml.etree import ElementTree
        ElementTree.fromstring(prompt)
    except ElementTree.ParseError as e:
        logging.warning(f"Prompt XML format warning: {e}")
    return prompt
'''
@log_execution_time
def summarize_text(text: str) -> str:
    prompt: str = """
{#- 用简体中文中文進行文章摘要 -#}
## Profile:​
- author: Vandee​
- role: 文章内容深度总结思考助手
- language: 中文​
- description: 全面的总结文章的主要观点,并结合严谨的逻辑思维分析文章要点,剖析文章内容。
## Goals:
- 第一步,仔细阅读文章内容。
- 第二步,对每个段落进行总结,总结文章的主要内容,理清楚作者表达了什么观点、作者解决了那些具体的问题。
- 第三步,文章要点总结。根据原文内容,提炼出文章的5个以内的主要观点或作者解决的问题。
- 第四步,根据上面三步,按照指定的输出格式,整理出文章内容的总结。
## Constrains:​
- 文章内容总结的{摘要}字数控制在380个中文汉字以内。
- 尽可能还原文章中的专业词汇,并对其进行通俗解释。
- 在总结的过程中,完全按照文章作者的表达内容进行整理,不添加你的额外观点。
- 所有输出用中文生成。
- 文章内容里的"我“是文章的原作者,不要代入 Vandee 的身份。
## Skills:​
- 善于用流畅通顺的简体中文总结内容重点。
- 具有良好的逻辑思维能力,能够深入分析文章内容。
- 掌握文章相关领域的专业知识,能够准确理解和阐述专业概念。
- 擅长以通俗易懂的方式解释复杂的专业内容。
## Workflows:​
- 逐段阅读文章内容。
- 总结文章的内容并生成{摘要}。这一步你需要全面理解文章内容的主题、内容的逻辑框架、作者的提出的观点,摘要不少于270个中文汉字。
- 再次回顾原文所有内容,在上一步总结出{摘要}的基础上,进行深入分析。这一步你需要理清这些内容之间的逻辑关系、专业概念、名词概念,并着重关注原文内容里多次出现的词汇或概念,特别关注作者提出了什么观点、作者解决了那些具体的问题、作者体悟出了哪些道理、作者得出了什么重大的研究结论,最后梳理出{精炼内容}。
- 根据原文内容和你上一步的{精炼内容},提炼出文章的至少4个要点生成{要点总结},你不用输出{精炼内容}。
- 你需要按照markdown有序列表的格式列出上一步{要点总结}中的要点,并根据要点所在的原文并严格根据文章内容扩展对该要点的解析,方便读者理解这些要点的意思。
- 按照指定的输出格式,整理出文章内容的总结。“摘要“和”要点总结“只需要按照markdown格式加粗,不要用标题格式。
## OutputFormat:
**摘要**:
{摘要}
**要点总结**:
{要点总结}
"""
    result = call_openai_api(prompt, text)  # 先调用 API 并存储结果
    time.sleep(1)  # 等待 1 秒
    return result  # 返回结果
@log_execution_time
def one_sentence_summary(text: str) -> str:
    prompt: str = "以下是对一篇长文的列表形式总结。请基于此输出对该文章的简短总结,长度不超过100个字。总是使用简体中文输出。"
    return call_openai_api(prompt, text)
def slugify(text: str) -> str:
    invalid_fs_chars: str = '/\\:*?"<>|'
    return re.sub(r'[' + re.escape(invalid_fs_chars) + r'\s]+', '-', text.lower()).strip('-')
def get_summary_file_path(title: str, timestamp: int, year: Optional[str] = None, month: Optional[str] = None, in_readme_md: bool = False) -> Path:
    date_str = datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d')
    summary_filename: str = f"{date_str}-{slugify(title)}.md"
    if year is None:
        year = CURRENT_YEAR
    if month is None:
        month = CURRENT_MONTH
    if in_readme_md:
        root: Path = Path(year, month)  # 更新路径为 year/month
    else:
        root: Path = Path(BOOKMARK_SUMMARY_REPO_NAME, year, month)  # 更新路径为 year/month
    return Path(root, summary_filename)
def get_text_content_path(title: str, in_summary_md: bool = False) -> Path:
    text_content_filename: str = f"{CURRENT_DATE}-{slugify(title)}_raw.md"
    root: Path = Path(BOOKMARK_SUMMARY_REPO_NAME, CURRENT_YEAR, CURRENT_MONTH)  # 更新路径为 YEAR/MONTH
    if in_summary_md:
        root = Path(".")
    return Path(root, text_content_filename)
def build_summary_file(title: str, url: str, summary: str, one_sentence: str) -> str:
    """构建总结文件的内容。"""
    return f"""# {title}
- URL: {url}
- Added At: {CURRENT_DATE_AND_TIME}
- [Link To Text]({get_text_content_path(title, in_summary_md=True)})
## Summary
{summary}
"""
def build_index_md(title: str, url: str, summary: str, one_sentence: str, text_content: str) -> str:
    """构建 index.md 文件内容,添加 YAML 头部并包含全文内容。"""
    # 处理标题中的冒号
    yaml_safe_title = title.replace(':', '-')
    return f"""---
title: {yaml_safe_title}
date: {CURRENT_DATE}
extra:
  source: {url}
  original_title: {title}
---
## Summary
{summary}
## Full Content
{text_content}
"""
def build_summary_readme_md(summarized_bookmarks: List[SummarizedBookmark]) -> str:
    initial_prefix: str = """# Clip
总会有一些没达到我想收录到PKM体系里标准的文章,但又弃之可惜。介于这两者之间的,就放在这个clip里了。区别于笔记,这里主要是原文的 Markdown。
Inspired by :[Owen's Clip](https://github.com/theowenyoung/clip) , [LLM x 书签收藏:摘要 & 全文索引 - Nekonull's Garden](https://nekonull.me/posts/llm_x_bookmark/)
## Summarized Bookmarks
"""
    summary_list: str = ""
    sorted_summarized_bookmarks = sorted(summarized_bookmarks, key=lambda bookmark: bookmark.timestamp, reverse=True)
    for bookmark in sorted_summarized_bookmarks:
        summary_file_path = get_summary_file_path(
            title=bookmark.title,
            timestamp=bookmark.timestamp,
            month=bookmark.month,
            in_readme_md=True
        )
        summary_list += f"- ({datetime.fromtimestamp(bookmark.timestamp).strftime('%Y-%m-%d')}) [{bookmark.title}]({summary_file_path})\n"
    return initial_prefix + summary_list
@log_execution_time
def post_to_teable(title: str, url: str, one_sentence: str) -> None:
    """
    Post a bookmark record to Teable
    """
    try:
        conn = http.client.HTTPSConnection("app.teable.io")
        payload = {
            "typecast": True,
            "records": [{
                "fields": {
                    "Title": title,
                    "Source": url,
                    "Summary": one_sentence,
                }
            }]
        }
        headers = {
            'Authorization': f"Bearer {TEABLE_TOKEN}",
            'Content-Type': "application/json"
        }
        conn.request(
            "POST",
            f"/api/table/{TEABLE_TABLE_ID}/record",
            json.dumps(payload),
            headers
        )
        response = conn.getresponse()
        if response.status not in (200, 201):
            logging.error(f"Failed to post to Teable. Status: {response.status}, Response: {response.read().decode()}")
        else:
            logging.info("Successfully posted to Teable")
    except Exception as e:
        logging.error(f"Error posting to Teable: {str(e)}")
    finally:
        conn.close()
@log_execution_time
def process_bookmark_file():
    # 读取书签和已总结书签
    with open(f'{BOOKMARK_COLLECTION_REPO_NAME}/README.md', 'r', encoding='utf-8') as f:
        bookmark_lines = f.readlines()
    with open(f'{BOOKMARK_SUMMARY_REPO_NAME}/data.json', 'r', encoding='utf-8') as f:
        summarized_bookmark_dicts = json.load(f)
        summarized_bookmarks = [SummarizedBookmark(**bookmark) for bookmark in summarized_bookmark_dicts]
    summarized_urls = {bookmark.url for bookmark in summarized_bookmarks}
    # 找到未总结的书签
    title, url = None, None
    for line in bookmark_lines:
        match = re.search(r'- \[(.*?)\]\((.*?)\)', line)
        if match and match.group(2) not in summarized_urls:
            title, url = match.groups()
            break
    # 如果没有找到新的书签,则退出
    if not title or not url:
        logging.info("No new bookmarks to summarize.")
        return
    # 将标题格式化为文件名
    title_slug = slugify(title)
    # 创建 YEAR/MONTH/ 目录
    monthly_path = Path(f'{BOOKMARK_SUMMARY_REPO_NAME}/{CURRENT_YEAR}/{CURRENT_MONTH}')
    monthly_path.mkdir(parents=True, exist_ok=True)
    # 创建 content/YEAR/MONTH/TITLE/ 目录
    content_path = Path(f'{BOOKMARK_SUMMARY_REPO_NAME}/content/{CURRENT_YEAR}/{CURRENT_MONTH}/{title_slug}')
    content_path.mkdir(parents=True, exist_ok=True)
    # 获取和总结内容
    text_content = get_text_content(url)
    summary = summarize_text(text_content)
    one_sentence = one_sentence_summary(summary)
    timestamp = int(datetime.now().timestamp())
    # 使用当前日期创建前缀
    date_prefix = datetime.now().strftime('%Y-%m-%d-')
    # 保存原始内容到 YEAR/MONTH/yyyy-MM-dd-title_raw.md
    with open(monthly_path / f"{date_prefix}{title_slug}_raw.md", 'w', encoding='utf-8') as f:
        f.write(text_content)
    # 保存总结内容到 YEAR/MONTH/yyyy-MM-dd-title.md
    summary_content = build_summary_file(title, url, summary, one_sentence)
    with open(monthly_path / f"{date_prefix}{title_slug}.md", 'w', encoding='utf-8') as f:
        f.write(summary_content)
    # 保存 index.md 到 content/YEAR/MONTH/TITLE/index.md
    index_content = build_index_md(title, url, summary, one_sentence, text_content)
    with open(content_path / "index.md", 'w', encoding='utf-8') as f:
        f.write(index_content)
    # 更新已总结的书签数据
    summarized_bookmarks.append(SummarizedBookmark(
        title=title,
        url=url,
        summary=one_sentence,
        year=CURRENT_YEAR,
        month=CURRENT_MONTH,
        timestamp=timestamp
    ))
    # 更新 README 和 data.json
    with open(f'{BOOKMARK_SUMMARY_REPO_NAME}/Bookmarks_List.md', 'w', encoding='utf-8') as f:
        f.write(build_summary_readme_md(summarized_bookmarks))
    with open(f'{BOOKMARK_SUMMARY_REPO_NAME}/data.json', 'w', encoding='utf-8') as f:
        json.dump([asdict(bookmark) for bookmark in summarized_bookmarks], f, indent=2, ensure_ascii=False)
        # Post to Teable
    if TEABLE_TOKEN and TEABLE_TABLE_ID:
        post_to_teable(title, url, one_sentence)
    else:
        logging.warning("Teable API token or table ID not set, skipping Teable update")
def main():
    process_bookmark_file()
if __name__ == "__main__":
    main()

workflow

修改 bookmark-collection 仓库的 yaml:

name: Bookmark Summary
on:
  workflow_dispatch:  # 只保留手动触发和被其他 workflow 触发的情况
concurrency:
  group: mygroup
  cancel-in-progress: false
jobs:
  summarize:
    runs-on: ubuntu-latest
    steps:
      - name: Checkout bookmark-collection
        uses: actions/checkout@v2
        with:
          path: bookmark-collection
      - name: Checkout bookmark-summary
        uses: actions/checkout@v2
        with:
          repository: VandeeFeng/bookmark-summary
          path: bookmark-summary
          token: ${{ secrets.PAT }}
      - name: Set up Python
        uses: actions/setup-python@v2
        with:
          python-version: '3.x'
      - name: Install dependencies with retry
        uses: nick-fields/retry@v3  # 使用 retry 包装安装依赖步骤
        with:
          timeout_minutes: 2  # 每次尝试的超时时间(2分钟)
          max_attempts: 3  # 最大重试次数
          command: |
            python -m pip install --upgrade pip
            pip install requests waybackpy
      - name: Process changes with retry
        uses: nick-fields/retry@v3  # 包装变更处理步骤
        with:
          timeout_minutes: 2  # 每次尝试的超时时间(5分钟)
          max_attempts: 3  # 最大重试次数
          command: |
            OPENAI_API_KEY=${{ secrets.OPENAI_API_KEY }} \
            OPENAI_API_MODEL=${{ secrets.OPENAI_API_MODEL }} \
            OPENAI_API_ENDPOINT=${{ secrets.OPENAI_API_ENDPOINT }} \
            TEABLE_TABLE_ID=${{ secrets.TEABLE_TABLE_ID }} \
            TEABLE_TOKEN=${{ secrets.TEABLE_TOKEN }} \
            python bookmark-summary/process_changes.py
      - name: Commit changes to bookmark-summary
        run: |
          cd bookmark-summary
          git config --local user.email "action@github.com"
          git config --local user.name "GitHub Action"
          git add .
          git commit -m "Add new summaries" || echo "No changes to commit"
          git push

bash

顺便把 websites 也用 bash 脚本保存在 teable 里了,之前一直用 org-capture,现在太多了不好检索。让 Claude 把之前的列表转换为了 csv 导入到了 teable。

#!/bin/bash
while true; do
  # 提示用户输入信息
  read -p "Enter Name (or type 'q' to quit): " name
  if [[ "$name" == "q" ]]; then
    echo "Exiting..."
    break
  fi
  read -p "Enter Intro (or type 'q' to quit): " intro
  if [[ "$intro" == "q" ]]; then
    echo "Exiting..."
    break
  fi
  read -p "Enter Source (or type 'q' to quit): " source
  if [[ "$source" == "q" ]]; then
    echo "Exiting..."
    break
  fi
# 发送 POST 请求
curl --request POST \
  --url https://app.teable.io/api/table/TEABLE_ID/record \
  --header 'Authorization: Bearer TEABLE_TOKEN' \
  --header 'content-type: application/json' \
  --data "$(cat <

PocketBase

需要在仓库的 secret 里增加 POCKETBASE_TOKENPOCKETBASE_API 两个secert,填入自己的 pocketbase API 地址和请求头。

这个请求头我也是看了好半天文档才弄明白:

需要在指定的 collection 的 API Rules 里,手动加上 @request.headers.x_token = "Your_token" ,这里填入的内容就是 POCKETBASE_TOKENPOCKETBASE_API 在每个 collection 里会显示。

由于这里指定了 header,在 python 的部分就得特别处理:

second_response = requests.post(
    url=os.environ['POCKETBASE_API'],
    headers={
        "x_token": f"{os.environ['POCKETBASE_TOKEN']}",
        "Content-Type": "application/json"
    },
    json={
        "URL": url,
        "title": title
    }
)

docker 部署到 VPS

version: "3.7"
services:
  pocketbase:
    image: ghcr.io/muchobien/pocketbase:latest
    container_name: pocketbase
    restart: unless-stopped
    ports:
      - "8090:8090"
    volumes:
      - "./data:/pb_data"
    healthcheck: #optional (recommended) since v0.10.0
      test: wget --no-verbose --tries=1 --spider http://localhost:8090/api/health || exit 1
      interval: 5s
      timeout: 5s
      retries: 5

Python

修改 bookmark-collection 仓库的 py 脚本:

import os
import re
import logging
import requests
from github import Github
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(message)s',
    datefmt='%Y-%m-%d %H:%M:%S'
)
def get_added_content():
    """获取这次提交新增的内容"""
    try:
        g = Github(os.environ['GITHUB_TOKEN'])
        repo = g.get_repo(os.environ['GITHUB_REPOSITORY'])
        commit_sha = os.environ.get('GITHUB_SHA')
        commit = repo.get_commit(commit_sha)
        for file in commit.files:
            if file.filename.endswith('.md'):
                if file.patch and '+' in file.patch:
                    added_lines = [line[1:] for line in file.patch.split('\n')
                                 if line.startswith('+') and not line.startswith('+++')]
                    return added_lines
        return []
    except Exception as e:
        logging.error(f"Error getting commit content: {str(e)}")
        raise
def extract_url_and_title(line: str):
    """从行中提取URL和标题,支持markdown格式的链接 [title](url)"""
    # 更新后的提取模式,同时获取标题和URL
    pattern = r'\[(.*?)\]\((https?://[\w\-._~:/?#\[\]@!$&\'()*+,;=.%]+)\)'
    match = re.search(pattern, line)
    if match:
        title = match.group(1)
        url = match.group(2)
        return url, title
    return None, None
def has_clip_tag(line: str) -> bool:
    """检查是否包含 #2clip 标签"""
    return bool(re.search(r'#2clip\b', line))
def trigger_workflow():
    """触发另一个workflow"""
    try:
        g = Github(os.environ['GITHUB_TOKEN'])
        repo = g.get_repo(os.environ['GITHUB_REPOSITORY'])
        workflow = repo.get_workflow("bookmark_summary.yml")
        workflow.create_dispatch("main")
        logging.info("Successfully triggered the bookmark_summary workflow")
    except Exception as e:
        logging.error(f"Failed to trigger workflow: {str(e)}")
        raise
def main():
    try:
        # 获取所有新增的内容
        added_lines = get_added_content()
        if not added_lines:
            logging.info("No new markdown content found in this commit")
            return
        trigger_needed = False
        # 处理每一行新增的内容
        for line in added_lines:
            line = line.strip()
            logging.info(f"Processing line: {line}")
            # 检查标签
            if has_clip_tag(line):
                trigger_needed = True
                logging.info("Found #2clip tag")
            # 提取并处理URL和标题(无论是否有标签)
            url, title = extract_url_and_title(line)
            if url:
                try:
                    # 发送到 Readwise
                    response = requests.post(
                        url="https://readwise.io/api/v3/save/",
                        headers={"Authorization": f"Token {os.environ['READWISE_TOKEN']}"},
                        json={
                            "url": url,
                            "tags": ["Bookmark"]
                        }
                    )
                    response.raise_for_status()
                    logging.info(f"Successfully saved URL to Readwise: {url}")
                    # 发送到第二个 API endpoint
                    second_response = requests.post(
                        url=os.environ['POCKETBASE_API'],
                        headers={
                            "x_token": f"{os.environ['POCKETBASE_TOKEN']}",
                            "Content-Type": "application/json"
                        },
                        json={
                            "URL": url,
                            "title": title
                        }
                    )
                    second_response.raise_for_status()
                    logging.info(f"Successfully saved URL to pocketbase: {url}")
                except requests.exceptions.RequestException as e:
                    logging.error(f"Failed to save URL {url}: {str(e)}")
        # 如果发现了标签,触发workflow
        if trigger_needed:
            logging.info("Triggering workflow due to #2clip tag")
            trigger_workflow()
    except Exception as e:
        logging.error(f"Error: {str(e)}")
        raise
if __name__ == "__main__":
    main()

workflow

修改 bookmark-collection 仓库的 yaml:

name: Save Bookmark to Readwise
on:
  push:
    branches:
      - main
    paths:
      - '**.md'
  workflow_dispatch:
permissions:
  contents: read
  actions: write
jobs:
  save-to-readwise:
    runs-on: ubuntu-latest
    steps:
    - name: Checkout repository
      uses: actions/checkout@v4
      with:
        token: ${{ secrets.GITHUB_TOKEN }}
    - name: Set up Python
      uses: actions/setup-python@v4
      with:
        python-version: '3.10'
    - name: Install dependencies
      run: |
        python -m pip install --upgrade pip
        pip install requests PyGithub
    - name: Run bookmark saver
      env:
        READWISE_TOKEN: ${{ secrets.READWISE_TOKEN }}
        GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
        GITHUB_REPOSITORY: ${{ github.repository }}
        POCKETBASE_API: ${{ secrets.POCKETBASE_API }}
        POCKETBASE_TOKEN: ${{ secrets.POCKETBASE_TOKEN }}
      run: python save_to_readwise.py

Readwise highlights

写了一个 class ReadwiseAPI 方便其他项目引入。可以定时获取我所有 highlights 的 title 和 url。

后面可以直接把 highlights 导入到 pocketbase。

python

import requests
import json
from datetime import datetime, timedelta
import os
from typing import List, Dict, Optional
from pathlib import Path
import re
from github import Github
import argparse
class ReadwiseAPI:
    """Readwise API client for exporting highlights with smart update capability and GitHub integration"""
    def __init__(self):
        # Initialize Readwise token
        self.readwise_token = os.environ.get("READWISE_TOKEN")
        if not self.readwise_token:
            raise ValueError("READWISE_TOKEN not found in environment variables")
        # Initialize GitHub token
        self.github_token = os.environ.get("GITHUB_TOKEN")
        if not self.github_token:
            raise ValueError("GITHUB_TOKEN not found in environment variables")
        # Get repository from GitHub Actions environment variable
        self.github_repo = os.environ.get("GITHUB_REPOSITORY")
        if not self.github_repo:
            raise ValueError("Not running in GitHub Actions environment (GITHUB_REPOSITORY not found)")
        # Initialize GitHub client
        self.github = Github(self.github_token)
        self.repo = self.github.get_repo(self.github_repo)
        # Initialize Readwise API settings
        self.base_url = "https://readwise.io/api/v2"
        self.headers = {
            "Authorization": f"Token {self.readwise_token}"
        }
        self.last_update_file = "last_update.json"
        self.articles_file = "articles.json"
    def get_highlights(self, updated_after: Optional[datetime] = None,
                      start_date: Optional[datetime] = None,
                      end_date: Optional[datetime] = None) -> Dict:
        """Get all highlights with their associated metadata"""
        endpoint = f"{self.base_url}/export/"
        params = {}
        if updated_after:
            params["updated_after"] = updated_after.isoformat()
        elif start_date:
            params["updated_after"] = start_date.isoformat()
            if end_date:
                params["updated_before"] = end_date.isoformat()
        print(f"Fetching highlights with params: {params}")
        response = requests.get(endpoint, headers=self.headers, params=params)
        response.raise_for_status()
        return response.json()
    def get_file_content(self, path: str) -> Optional[str]:
        """Get file content from GitHub repository"""
        try:
            content = self.repo.get_contents(path)
            return content.decoded_content.decode('utf-8')
        except Exception as e:
            print(f"File {path} not found in repository: {e}")
            return None
    def update_file(self, path: str, content: str, message: str):
        """Update or create file in GitHub repository"""
        try:
            # Try to get existing file
            file = self.repo.get_contents(path)
            # Update existing file
            self.repo.update_file(
                path=path,
                message=message,
                content=content,
                sha=file.sha
            )
        except Exception:
            # Create new file if it doesn't exist
            self.repo.create_file(
                path=path,
                message=message,
                content=content
            )
    def clean_title(self, title: str) -> str:
        """Clean title by removing newlines and extra spaces"""
        title = re.sub(r'\s+', ' ', title.replace('\n', ' '))
        return title.strip()
    def create_article_json(self, highlights_data: Dict) -> List[Dict]:
        """Create a list of articles with title and URL, only for category 'articles'"""
        articles = []
        for article in highlights_data.get('results', []):
            if article.get('category', '').lower() == 'articles':
                title = self.clean_title(article.get('title', 'Untitled'))
                url = article.get('source_url', '')
                articles.append({
                    'title': title,
                    'url': url
                })
        return articles
    def load_last_update_from_github(self) -> Optional[datetime]:
        """Load the last update date from GitHub"""
        content = self.get_file_content(self.last_update_file)
        if content:
            try:
                data = json.loads(content)
                return datetime.strptime(data['last_update'], '%Y-%m-%d')
            except Exception as e:
                print(f"Error parsing last update file: {e}")
                return None
        return None
    def save_last_update_to_github(self):
        """Save current date as last update date to GitHub"""
        current_date = datetime.now().strftime('%Y-%m-%d')
        content = json.dumps({'last_update': current_date})
        self.update_file(
            path=self.last_update_file,
            content=content,
            message="Update last sync date"
        )
    def load_existing_articles_from_github(self) -> List[Dict]:
        """Load existing articles from GitHub"""
        content = self.get_file_content(self.articles_file)
        if content:
            try:
                return json.loads(content)
            except Exception as e:
                print(f"Error parsing articles file: {e}")
                return []
        return []
    def merge_articles(self, existing_articles: List[Dict], new_articles: List[Dict]) -> List[Dict]:
        """Merge new articles with existing ones, avoiding duplicates"""
        existing_set = {(article['title'], article['url']) for article in existing_articles}
        for article in new_articles:
            article_tuple = (article['title'], article['url'])
            if article_tuple not in existing_set:
                existing_articles.append(article)
                existing_set.add(article_tuple)
        return existing_articles
    def export_articles(self, start_date: Optional[str] = None,
                       end_date: Optional[str] = None,
                       all_time: bool = False):
        """
        Export articles to GitHub with smart update capability
        Args:
            start_date: Optional start date in YYYY-MM-DD format
            end_date: Optional end date in YYYY-MM-DD format
            all_time: If True, fetch all highlights regardless of dates
        """
        if all_time:
            # 当选择 all_time 时,强制获取所有 highlights,忽略上次更新时间
            print("Fetching all highlights from the beginning")
            highlights_data = self.get_highlights()
        elif start_date:
            # 如果指定了开始日期,使用指定的日期范围
            start_datetime = datetime.strptime(start_date, '%Y-%m-%d')
            end_datetime = datetime.strptime(end_date, '%Y-%m-%d') if end_date else datetime.now()
            print(f"Fetching highlights from {start_date} to {end_date or 'now'}")
            highlights_data = self.get_highlights(start_date=start_datetime, end_date=end_datetime)
        else:
            # 使用上次更新时间的增量更新逻辑
            last_update = self.load_last_update_from_github()
            if last_update:
                days_since_update = (datetime.now() - last_update).days
                print(f"Last update was {days_since_update} days ago on {last_update.strftime('%Y-%m-%d')}")
                if days_since_update > 0:
                    print(f"Fetching highlights updated after {last_update.strftime('%Y-%m-%d')}")
                    highlights_data = self.get_highlights(updated_after=last_update)
                else:
                    print("Already updated today, no need to fetch new articles")
                    return
            else:
                print("No previous update found, fetching all articles")
                highlights_data = self.get_highlights()
        # Create article data
        new_articles = self.create_article_json(highlights_data)
        print(f"Found {len(new_articles)} new articles")
        # Load existing articles
        existing_articles = self.load_existing_articles_from_github()
        print(f"Found {len(existing_articles)} existing articles")
        # Merge new articles with existing ones
        merged_articles = self.merge_articles(existing_articles, new_articles)
        print(f"Total unique articles after merge: {len(merged_articles)}")
        # Save merged articles to GitHub
        self.update_file(
            path=self.articles_file,
            content=json.dumps(merged_articles, ensure_ascii=False, indent=2),
            message="Update articles list"
        )
        # Update the last update date
        if not start_date and not all_time:  # 只有在非指定日期范围和非全量更新的情况下才更新最后同步时间
            self.save_last_update_to_github()
        print(f"Successfully updated articles in GitHub repository")
        if new_articles:
            print("New articles added:")
            for article in new_articles:
                print(f"- {article['title']}")
def main():
    # 从环境变量获取 GitHub Actions 的输入参数
    gh_start_date = os.environ.get('INPUT_START_DATE', '')
    gh_end_date = os.environ.get('INPUT_END_DATE', '')
    gh_all_time = os.environ.get('INPUT_ALL_TIME', '').lower() == 'true'
    # 设置命令行参数解析器
    parser = argparse.ArgumentParser(description='Sync Readwise highlights to GitHub')
    parser.add_argument('--start-date', type=str, help='Start date in YYYY-MM-DD format')
    parser.add_argument('--end-date', type=str, help='End date in YYYY-MM-DD format')
    parser.add_argument('--all-time', action='store_true', help='Fetch all highlights from the beginning')
    args = parser.parse_args()
    # 优先使用命令行参数,如果没有则使用 GitHub Actions 的输入参数
    start_date = args.start_date or gh_start_date
    end_date = args.end_date or gh_end_date
    all_time = args.all_time or gh_all_time
    try:
        client = ReadwiseAPI()
        client.export_articles(
            start_date=start_date if start_date else None,
            end_date=end_date if end_date else None,
            all_time=all_time
        )
    except Exception as e:
        print(f"An error occurred: {str(e)}")
        raise
if __name__ == "__main__":
    main()

workflow

name: Sync Readwise Articles
on:
  schedule:
    # 每天凌晨 1 点运行 (UTC 时间,对应北京时间 9 点)
    - cron: '0 1 * * *'
  # 支持手动触发,并添加输入参数
  workflow_dispatch:
    inputs:
      start_date:
        description: 'Start date (YYYY-MM-DD, e.g., 2024-01-01)'
        required: false
        type: string
        default: ''
      end_date:
        description: 'End date (YYYY-MM-DD, leave empty for current date)'
        required: false
        type: string
        default: ''
      all_time:
        description: 'Fetch all highlights (overrides date range if selected)'
        type: boolean
        required: false
        default: false
permissions:
  contents: write      # 仓库内容的读写权限
jobs:
  sync:
    runs-on: ubuntu-latest
    steps:
    - name: Checkout repository
      uses: actions/checkout@v4
    - name: Set up Python
      uses: actions/setup-python@v5
      with:
        python-version: '3.10'
        cache: 'pip'
        cache-dependency-path: '**/requirements.txt'
    - name: Install dependencies
      run: |
        python -m pip install --upgrade pip
        pip install -r requirements.txt
    - name: Run sync script
      env:
        READWISE_TOKEN: ${{ secrets.READWISE_TOKEN }}
        GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
        INPUT_START_DATE: ${{ github.event.inputs.start_date }}
        INPUT_END_DATE: ${{ github.event.inputs.end_date }}
        INPUT_ALL_TIME: ${{ github.event.inputs.all_time }}
      run: python readwise_sync.py
    - name: Check for changes
      id: verify-changed-files
      run: |
        if [ -n "$(git status --porcelain)" ]; then
          echo "changes_found=true" >> $GITHUB_OUTPUT
        else
          echo "changes_found=false" >> $GITHUB_OUTPUT
        fi
    - name: Commit changes
      if: steps.verify-changed-files.outputs.changes_found == 'true'
      run: |
        git config --local user.email "github-actions[bot]@users.noreply.github.com"
        git config --local user.name "github-actions[bot]"
        git add articles.json last_update.json
        git commit -m "Update Readwise articles [skip ci]" || echo "No changes to commit"
    - name: Push changes
      if: steps.verify-changed-files.outputs.changes_found == 'true'
      uses: ad-m/github-push-action@master
      with:
        github_token: ${{ secrets.GITHUB_TOKEN }}
        branch: ${{ github.ref }}
Tags: PKM Github Python Database