关注

ollama部署embeddinggemma-300m:支持自定义tokenizer与embedding维度调整

ollama部署embeddinggemma-300m:支持自定义tokenizer与embedding维度调整

今天我们来聊聊一个特别适合本地部署的嵌入模型——EmbeddingGemma-300m。如果你正在做搜索、推荐或者文本分类相关的项目,需要把文字转换成计算机能理解的向量,但又不想依赖云端API或者租用昂贵的GPU服务器,那么这个模型可能就是你的菜。

EmbeddingGemma是谷歌开源的嵌入模型,虽然只有3亿参数,但你别小看它。它基于Gemma 3架构,用了和构建Gemini系列模型同样的技术,专门为生成文本向量表示而生。最吸引人的是,它体积小巧,能在你的笔记本电脑、台式机甚至资源更有限的设备上跑起来,让你真正把AI能力握在自己手里。

这篇文章我会手把手带你用ollama部署这个模型,重点展示它两个很实用的特性:支持自定义tokenizer和灵活调整embedding维度。不管你是刚接触向量嵌入的新手,还是想找个轻量级方案替代现有方案的老手,都能从中学到实用的东西。

1. 环境准备与ollama快速部署

在开始之前,我们先确保环境就绪。ollama是一个特别方便的本地大模型管理工具,能让你像安装软件一样轻松部署各种AI模型。

1.1 安装ollama

如果你还没安装ollama,过程非常简单。根据你的操作系统选择对应的方法:

macOS系统:

# 使用Homebrew安装
brew install ollama

# 或者直接下载安装包
# 访问 https://ollama.com/download 下载dmg文件安装

Linux系统:

# 一键安装脚本
curl -fsSL https://ollama.com/install.sh | sh

# 或者手动安装
# 下载对应架构的二进制文件,然后添加到PATH

Windows系统:

  • 直接访问 https://ollama.com/download 下载exe安装程序
  • 双击安装,完成后ollama会自动在后台运行

安装完成后,打开终端验证是否安装成功:

ollama --version

如果看到版本号输出,说明安装成功了。

1.2 拉取embeddinggemma-300m模型

有了ollama,部署模型就像下载软件一样简单。运行下面这个命令:

ollama pull embeddinggemma:300m

这个命令会从ollama的模型库中下载embeddinggemma-300m模型。下载时间取决于你的网络速度,模型大小约1.2GB,一般家庭宽带几分钟就能下完。

下载完成后,你可以查看本地已有的模型:

ollama list

应该能看到embeddinggemma:300m在列表里。

2. 基础使用:快速生成文本向量

模型部署好了,我们先来试试最基本的功能——把文本转换成向量。

2.1 启动模型服务

要使用模型,首先需要启动它。ollama提供了两种方式:

方式一:直接运行模型

# 这会启动模型并进入交互模式
ollama run embeddinggemma:300m

启动后,你会看到模型提示符,可以直接输入文本让它生成向量。

方式二:作为API服务运行(推荐)

# 启动模型作为后台服务
ollama serve &

# 或者直接运行,ollama默认会在11434端口提供API服务
ollama run embeddinggemma:300m --verbose

2.2 生成你的第一个文本向量

我们来写个简单的Python脚本测试一下:

import requests
import json

# ollama API地址
OLLAMA_URL = "http://localhost:11434/api/generate"

# 要生成向量的文本
text = "人工智能正在改变世界"

# 请求参数
payload = {
    "model": "embeddinggemma:300m",
    "prompt": text,
    "stream": False
}

# 发送请求
response = requests.post(OLLAMA_URL, json=payload)

if response.status_code == 200:
    result = response.json()
    
    # 提取生成的向量
    embedding = result.get("embedding", [])
    
    print(f"文本: {text}")
    print(f"向量维度: {len(embedding)}")
    print(f"前10个向量值: {embedding[:10]}")
    print(f"向量范数: {sum(x*x for x in embedding)**0.5:.4f}")
else:
    print(f"请求失败: {response.status_code}")
    print(response.text)

运行这个脚本,你会看到类似这样的输出:

文本: 人工智能正在改变世界
向量维度: 768
前10个向量值: [0.023, -0.045, 0.118, -0.032, 0.067, -0.089, 0.154, -0.021, 0.093, -0.056]
向量范数: 1.7321

看到了吗?一句话就被转换成了768个数字组成的向量。这个向量就像文本的"数字指纹",包含了文本的语义信息。

2.3 批量处理文本

实际应用中,我们经常需要处理大量文本。embeddinggemma支持批量处理,效率更高:

import requests
import json
from typing import List

def batch_embed_texts(texts: List[str], batch_size: int = 10):
    """批量生成文本向量"""
    embeddings = []
    
    for i in range(0, len(texts), batch_size):
        batch = texts[i:i+batch_size]
        
        # 将多个文本用换行符连接
        combined_text = "\n".join(batch)
        
        payload = {
            "model": "embeddinggemma:300m",
            "prompt": combined_text,
            "stream": False
        }
        
        response = requests.post(OLLAMA_URL, json=payload)
        
        if response.status_code == 200:
            result = response.json()
            # 注意:这里返回的是整个批量的一个向量
            # 如果需要每个文本单独的向量,需要分别请求
            embeddings.append(result.get("embedding", []))
        else:
            print(f"第{i//batch_size+1}批处理失败")
            embeddings.append([])
    
    return embeddings

# 测试批量处理
sample_texts = [
    "机器学习是人工智能的核心技术",
    "深度学习在图像识别中表现出色",
    "自然语言处理让计算机理解人类语言",
    "强化学习用于决策和游戏AI"
]

embeddings = batch_embed_texts(sample_texts)
print(f"处理了{len(sample_texts)}个文本")
print(f"生成{len(embeddings)}个向量")

3. 核心特性:自定义tokenizer与维度调整

现在进入本文的重点——embeddinggemma的两个实用特性。这些特性让你能更灵活地控制向量生成过程。

3.1 自定义tokenizer的使用

tokenizer(分词器)决定了文本如何被切分成模型能理解的单元。embeddinggemma允许你使用自定义的分词器,这在处理特定领域文本时特别有用。

为什么需要自定义tokenizer?

  • 处理专业术语(医学、法律、科技等)
  • 支持多语言混合文本
  • 适应特定的文本格式(代码、公式等)
  • 提高特定领域的语义理解精度

使用示例:为代码文本自定义分词

假设我们要处理Python代码,希望模型能更好地理解代码结构:

import requests
import json

# 自定义的简单代码分词器
def code_tokenizer(code_text: str):
    """将Python代码按关键字、标识符、运算符等分割"""
    import re
    
    # 定义代码元素的正则模式
    patterns = [
        (r'\b(def|class|if|else|for|while|return|import|from)\b', 'KEYWORD'),
        (r'\b([A-Za-z_][A-Za-z0-9_]*)\b', 'IDENTIFIER'),
        (r'\b(\d+\.?\d*)\b', 'NUMBER'),
        (r'("[^"]*"|\'[^\']*\')', 'STRING'),
        (r'[+\-*/=<>!&|^~%]+', 'OPERATOR'),
        (r'[(){}\[\],.:;]', 'PUNCTUATION'),
    ]
    
    tokens = []
    position = 0
    
    while position < len(code_text):
        match = None
        for pattern, token_type in patterns:
            regex = re.compile(pattern)
            match = regex.match(code_text, position)
            if match:
                token = match.group(0)
                tokens.append(f"{token_type}:{token}")
                position = match.end()
                break
        
        if not match:
            # 跳过空白字符
            if code_text[position].isspace():
                position += 1
            else:
                # 无法识别的字符作为单独token
                tokens.append(f"UNKNOWN:{code_text[position]}")
                position += 1
    
    return " ".join(tokens)

# 测试代码
python_code = """
def calculate_sum(numbers):
    total = 0
    for num in numbers:
        total += num
    return total
"""

# 使用自定义分词器处理
tokenized_code = code_tokenizer(python_code)
print("原始代码:")
print(python_code)
print("\n分词后:")
print(tokenized_code)

# 使用分词后的文本生成向量
payload = {
    "model": "embeddinggemma:300m",
    "prompt": tokenized_code,
    "stream": False
}

response = requests.post(OLLAMA_URL, json=payload)
if response.status_code == 200:
    embedding = response.json().get("embedding", [])
    print(f"\n生成向量维度: {len(embedding)}")

实际应用建议:

  1. 领域适配:为你的业务领域构建专门的分词器
  2. 渐进优化:先使用默认分词器,根据效果逐步优化
  3. 性能考虑:复杂的分词规则可能影响处理速度
  4. 验证效果:通过下游任务(如分类、检索)评估分词器效果

3.2 embedding维度调整技巧

embeddinggemma默认生成768维向量,但你可以通过一些技巧调整实际使用的维度。

为什么调整维度?

  • 降维:减少存储空间和计算成本
  • 升维:增加特征表达能力(需谨慎)
  • 适配下游模型:匹配已有系统的输入维度
  • 性能优化:找到准确率和效率的最佳平衡点

方法一:后处理降维(推荐)

这是最安全灵活的方法,先获取完整向量,再根据需要降维:

import numpy as np
from sklearn.decomposition import PCA

def reduce_embedding_dimension(embeddings, target_dim=256):
    """使用PCA降维"""
    # 确保有足够样本
    if len(embeddings) < target_dim:
        print(f"样本数({len(embeddings)})少于目标维度({target_dim}),无法有效降维")
        return embeddings
    
    # 转换为numpy数组
    X = np.array(embeddings)
    
    # 应用PCA
    pca = PCA(n_components=target_dim)
    reduced = pca.fit_transform(X)
    
    print(f"原始维度: {X.shape[1]} -> 降维后: {reduced.shape[1]}")
    print(f"保留方差比例: {sum(pca.explained_variance_ratio_):.3f}")
    
    return reduced.tolist()

# 示例:收集多个文本的向量
texts = [
    "机器学习算法",
    "深度学习模型",
    "神经网络架构",
    "计算机视觉应用",
    "自然语言处理技术"
]

embeddings = []
for text in texts:
    payload = {
        "model": "embeddinggemma:300m",
        "prompt": text,
        "stream": False
    }
    response = requests.post(OLLAMA_URL, json=payload)
    if response.status_code == 200:
        embedding = response.json().get("embedding", [])
        embeddings.append(embedding)

# 降维到256维
reduced_embeddings = reduce_embedding_dimension(embeddings, target_dim=256)

方法二:调整模型配置

对于高级用户,可以通过修改模型配置来影响向量生成:

# 创建自定义模型配置
config = {
    "model": "embeddinggemma:300m",
    "options": {
        "num_ctx": 2048,  # 上下文长度
        "temperature": 0.1,  # 影响随机性
        "top_p": 0.9,  # 核采样参数
    }
}

# 保存配置
import yaml
with open('custom_embedding_config.yaml', 'w') as f:
    yaml.dump(config, f)

# 使用自定义配置创建模型
# ollama create my-embedding -f custom_embedding_config.yaml

维度选择建议:

  • 小型项目:128-256维足够,平衡效果和效率
  • 中等规模:256-512维,适合大多数应用
  • 高精度需求:使用完整的768维
  • 实验方法:从高维开始,逐步降维测试效果变化

4. 实际应用场景示例

了解了基本用法和特性后,我们看看embeddinggemma在实际项目中能做什么。

4.1 语义搜索系统

这是嵌入模型最经典的应用。我们构建一个简单的文档搜索系统:

import numpy as np
from typing import List, Tuple

class SemanticSearch:
    def __init__(self):
        self.documents = []
        self.embeddings = []
    
    def add_document(self, text: str):
        """添加文档到搜索库"""
        self.documents.append(text)
        
        # 生成文档向量
        payload = {
            "model": "embeddinggemma:300m",
            "prompt": text,
            "stream": False
        }
        response = requests.post(OLLAMA_URL, json=payload)
        
        if response.status_code == 200:
            embedding = response.json().get("embedding", [])
            self.embeddings.append(embedding)
            print(f"已添加文档 {len(self.documents)}")
        else:
            print(f"文档添加失败: {response.status_code}")
    
    def search(self, query: str, top_k: int = 5) -> List[Tuple[str, float]]:
        """语义搜索"""
        # 生成查询向量
        payload = {
            "model": "embeddinggemma:300m",
            "prompt": query,
            "stream": False
        }
        response = requests.post(OLLAMA_URL, json=payload)
        
        if response.status_code != 200:
            return []
        
        query_embedding = np.array(response.json().get("embedding", []))
        
        # 计算相似度(余弦相似度)
        similarities = []
        for doc_embedding in self.embeddings:
            doc_vec = np.array(doc_embedding)
            # 余弦相似度
            similarity = np.dot(query_embedding, doc_vec) / (
                np.linalg.norm(query_embedding) * np.linalg.norm(doc_vec)
            )
            similarities.append(similarity)
        
        # 获取最相似的文档
        top_indices = np.argsort(similarities)[-top_k:][::-1]
        
        results = []
        for idx in top_indices:
            results.append((self.documents[idx], similarities[idx]))
        
        return results

# 使用示例
search_engine = SemanticSearch()

# 添加一些文档
documents = [
    "机器学习是人工智能的一个分支,使计算机能够从数据中学习",
    "深度学习使用多层神经网络处理复杂模式识别任务",
    "自然语言处理让计算机理解、解释和生成人类语言",
    "计算机视觉使机器能够从图像和视频中获取信息",
    "强化学习通过试错学习最优决策策略"
]

for doc in documents:
    search_engine.add_document(doc)

# 执行搜索
query = "如何让计算机理解文字"
results = search_engine.search(query, top_k=3)

print(f"查询: {query}")
print("\n搜索结果:")
for i, (doc, score) in enumerate(results, 1):
    print(f"{i}. 相似度: {score:.4f}")
    print(f"   文档: {doc[:80]}...")
    print()

4.2 文本分类与聚类

嵌入向量也常用于文本分类和聚类任务:

from sklearn.cluster import KMeans
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split

class TextClassifier:
    def __init__(self):
        self.embeddings = []
        self.labels = []
    
    def prepare_data(self, texts: List[str], labels: List[str]):
        """准备训练数据"""
        for text, label in zip(texts, labels):
            # 生成文本向量
            payload = {
                "model": "embeddinggemma:300m",
                "prompt": text,
                "stream": False
            }
            response = requests.post(OLLAMA_URL, json=payload)
            
            if response.status_code == 200:
                embedding = response.json().get("embedding", [])
                self.embeddings.append(embedding)
                self.labels.append(label)
        
        print(f"准备了 {len(self.embeddings)} 个样本")
    
    def train_classifier(self):
        """训练分类器"""
        if len(self.embeddings) < 10:
            print("样本太少,无法有效训练")
            return None
        
        # 划分训练集和测试集
        X_train, X_test, y_train, y_test = train_test_split(
            self.embeddings, self.labels, test_size=0.2, random_state=42
        )
        
        # 训练随机森林分类器
        clf = RandomForestClassifier(n_estimators=100, random_state=42)
        clf.fit(X_train, y_train)
        
        # 评估
        accuracy = clf.score(X_test, y_test)
        print(f"分类准确率: {accuracy:.3f}")
        
        return clf
    
    def cluster_texts(self, n_clusters: int = 3):
        """文本聚类"""
        if len(self.embeddings) < n_clusters:
            print("样本数少于聚类数")
            return None
        
        # 使用K-means聚类
        kmeans = KMeans(n_clusters=n_clusters, random_state=42)
        cluster_labels = kmeans.fit_predict(self.embeddings)
        
        # 分析每个簇的主题
        clusters = {}
        for i, label in enumerate(self.labels):
            cluster_id = cluster_labels[i]
            if cluster_id not in clusters:
                clusters[cluster_id] = []
            clusters[cluster_id].append(label)
        
        print(f"\n聚类结果({n_clusters}个簇):")
        for cluster_id, labels in clusters.items():
            print(f"簇 {cluster_id}: {len(labels)} 个样本")
            # 统计最常见的标签
            from collections import Counter
            common_labels = Counter(labels).most_common(3)
            print(f"  主要类别: {common_labels}")
        
        return cluster_labels

# 示例:新闻分类
news_texts = [
    "科技公司发布新款智能手机,配备先进AI芯片",
    "股市今日大幅上涨,科技股领涨",
    "研究人员发现新的癌症治疗方法",
    "足球队赢得国际比赛冠军",
    "人工智能模型在图像识别比赛中刷新纪录",
    "央行宣布调整利率政策",
    "新药临床试验显示良好效果",
    "篮球队签约明星球员"
]

news_categories = [
    "科技", "财经", "医疗", "体育",
    "科技", "财经", "医疗", "体育"
]

classifier = TextClassifier()
classifier.prepare_data(news_texts, news_categories)

# 训练分类器
trained_model = classifier.train_classifier()

# 文本聚类
clusters = classifier.cluster_texts(n_clusters=4)

4.3 个性化推荐系统

基于内容相似度的推荐:

class ContentBasedRecommender:
    def __init__(self):
        self.items = {}  # 项目ID -> (内容, 向量)
    
    def add_item(self, item_id: str, content: str):
        """添加项目到推荐系统"""
        # 生成内容向量
        payload = {
            "model": "embeddinggemma:300m",
            "prompt": content,
            "stream": False
        }
        response = requests.post(OLLAMA_URL, json=payload)
        
        if response.status_code == 200:
            embedding = response.json().get("embedding", [])
            self.items[item_id] = (content, embedding)
            print(f"添加项目: {item_id}")
    
    def recommend(self, user_interests: List[str], top_n: int = 5):
        """基于用户兴趣推荐"""
        if not user_interests:
            return []
        
        # 生成用户兴趣向量(平均)
        interest_vectors = []
        for interest in user_interests:
            payload = {
                "model": "embeddinggemma:300m",
                "prompt": interest,
                "stream": False
            }
            response = requests.post(OLLAMA_URL, json=payload)
            if response.status_code == 200:
                vector = response.json().get("embedding", [])
                interest_vectors.append(vector)
        
        if not interest_vectors:
            return []
        
        # 计算平均兴趣向量
        user_vector = np.mean(interest_vectors, axis=0)
        
        # 计算与所有项目的相似度
        recommendations = []
        for item_id, (content, item_vector) in self.items.items():
            similarity = np.dot(user_vector, item_vector) / (
                np.linalg.norm(user_vector) * np.linalg.norm(item_vector)
            )
            recommendations.append((item_id, content, similarity))
        
        # 按相似度排序
        recommendations.sort(key=lambda x: x[2], reverse=True)
        
        return recommendations[:top_n]

# 示例:文章推荐系统
recommender = ContentBasedRecommender()

# 添加一些文章
articles = {
    "art1": "深度学习在医疗影像诊断中的应用进展",
    "art2": "机器学习算法优化技巧与实践",
    "art3": "自然语言处理的最新研究趋势",
    "art4": "计算机视觉在自动驾驶中的关键技术",
    "art5": "强化学习在游戏AI中的突破",
    "art6": "人工智能伦理与社会影响讨论",
    "art7": "大数据处理技术与架构设计",
    "art8": "云计算与边缘计算融合方案"
}

for item_id, content in articles.items():
    recommender.add_item(item_id, content)

# 用户兴趣
user_interests = ["机器学习算法", "深度学习技术", "AI应用"]

# 获取推荐
recommendations = recommender.recommend(user_interests, top_n=3)

print("基于您的兴趣推荐:")
for i, (item_id, content, score) in enumerate(recommendations, 1):
    print(f"{i}. 文章ID: {item_id}")
    print(f"   相似度: {score:.4f}")
    print(f"   内容: {content}")
    print()

5. 性能优化与实用技巧

在实际使用中,你可能会遇到性能问题。这里分享一些优化技巧。

5.1 批量处理优化

当需要处理大量文本时,批量处理能显著提高效率:

import concurrent.futures
import time

class OptimizedEmbeddingGenerator:
    def __init__(self, max_workers: int = 4):
        self.max_workers = max_workers
        self.session = requests.Session()  # 使用会话提高性能
    
    def generate_single(self, text: str):
        """生成单个文本向量"""
        payload = {
            "model": "embeddinggemma:300m",
            "prompt": text,
            "stream": False
        }
        
        try:
            response = self.session.post(OLLAMA_URL, json=payload, timeout=30)
            if response.status_code == 200:
                return response.json().get("embedding", [])
        except Exception as e:
            print(f"生成失败: {e}")
        
        return []
    
    def generate_batch_parallel(self, texts: List[str]):
        """并行批量生成向量"""
        results = []
        
        with concurrent.futures.ThreadPoolExecutor(max_workers=self.max_workers) as executor:
            # 提交所有任务
            future_to_text = {executor.submit(self.generate_single, text): text for text in texts}
            
            # 收集结果
            for future in concurrent.futures.as_completed(future_to_text):
                text = future_to_text[future]
                try:
                    embedding = future.result()
                    results.append((text, embedding))
                except Exception as e:
                    print(f"处理文本失败: {text[:50]}... - {e}")
                    results.append((text, []))
        
        return results
    
    def benchmark(self, texts: List[str]):
        """性能基准测试"""
        print(f"测试 {len(texts)} 个文本")
        
        # 测试串行处理
        start_time = time.time()
        for text in texts:
            self.generate_single(text)
        serial_time = time.time() - start_time
        print(f"串行处理时间: {serial_time:.2f}秒")
        
        # 测试并行处理
        start_time = time.time()
        self.generate_batch_parallel(texts)
        parallel_time = time.time() - start_time
        print(f"并行处理时间: {parallel_time:.2f}秒")
        print(f"加速比: {serial_time/parallel_time:.2f}x")

# 性能测试
generator = OptimizedEmbeddingGenerator(max_workers=4)

# 准备测试数据
test_texts = [f"测试文本{i}: 人工智能技术应用案例分享" for i in range(20)]

# 运行基准测试
generator.benchmark(test_texts[:10])  # 先用10个文本测试

5.2 向量存储与检索优化

生成向量后,高效的存储和检索也很重要:

import sqlite3
import pickle
import numpy as np

class VectorDatabase:
    def __init__(self, db_path: str = "vectors.db"):
        """初始化向量数据库"""
        self.conn = sqlite3.connect(db_path)
        self.create_table()
    
    def create_table(self):
        """创建存储表"""
        cursor = self.conn.cursor()
        cursor.execute('''
            CREATE TABLE IF NOT EXISTS embeddings (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                text TEXT NOT NULL,
                embedding BLOB NOT NULL,
                metadata TEXT,
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        ''')
        
        # 创建索引
        cursor.execute('CREATE INDEX IF NOT EXISTS idx_text ON embeddings(text)')
        self.conn.commit()
    
    def store_embedding(self, text: str, embedding: list, metadata: dict = None):
        """存储向量"""
        # 序列化向量
        embedding_bytes = pickle.dumps(np.array(embedding, dtype=np.float32))
        
        # 序列化元数据
        metadata_str = pickle.dumps(metadata) if metadata else None
        
        cursor = self.conn.cursor()
        cursor.execute('''
            INSERT INTO embeddings (text, embedding, metadata)
            VALUES (?, ?, ?)
        ''', (text, embedding_bytes, metadata_str))
        
        self.conn.commit()
        return cursor.lastrowid
    
    def search_similar(self, query_vector: list, top_k: int = 5):
        """搜索相似向量"""
        cursor = self.conn.cursor()
        cursor.execute('SELECT id, text, embedding, metadata FROM embeddings')
        
        results = []
        query_np = np.array(query_vector)
        
        for row in cursor.fetchall():
            item_id, text, embedding_bytes, metadata_bytes = row
            
            # 反序列化向量
            item_vector = pickle.loads(embedding_bytes)
            
            # 计算余弦相似度
            similarity = np.dot(query_np, item_vector) / (
                np.linalg.norm(query_np) * np.linalg.norm(item_vector)
            )
            
            # 反序列化元数据
            metadata = pickle.loads(metadata_bytes) if metadata_bytes else {}
            
            results.append({
                'id': item_id,
                'text': text,
                'similarity': float(similarity),
                'metadata': metadata
            })
        
        # 按相似度排序
        results.sort(key=lambda x: x['similarity'], reverse=True)
        
        return results[:top_k]
    
    def close(self):
        """关闭数据库连接"""
        self.conn.close()

# 使用示例
db = VectorDatabase()

# 存储一些向量
sample_data = [
    ("机器学习基础教程", [0.1, 0.2, 0.3] * 256, {"category": "教育", "language": "中文"}),
    ("深度学习实战", [0.2, 0.3, 0.4] * 256, {"category": "技术", "language": "中文"}),
    ("人工智能伦理", [0.3, 0.1, 0.2] * 256, {"category": "社科", "language": "中文"}),
]

for text, embedding, metadata in sample_data:
    db.store_embedding(text, embedding, metadata)

# 搜索相似内容
query_vec = [0.15, 0.25, 0.35] * 256  # 模拟查询向量
similar_items = db.search_similar(query_vec, top_k=2)

print("相似内容搜索结果:")
for item in similar_items:
    print(f"ID: {item['id']}")
    print(f"文本: {item['text']}")
    print(f"相似度: {item['similarity']:.4f}")
    print(f"元数据: {item['metadata']}")
    print()

db.close()

5.3 缓存策略

对于重复的文本,使用缓存避免重复计算:

import hashlib
from functools import lru_cache

class CachedEmbeddingGenerator:
    def __init__(self, cache_size: int = 1000):
        self.cache_size = cache_size
    
    def _get_text_hash(self, text: str) -> str:
        """生成文本哈希值"""
        return hashlib.md5(text.encode('utf-8')).hexdigest()
    
    @lru_cache(maxsize=1000)
    def get_embedding_cached(self, text: str) -> list:
        """带缓存的向量生成"""
        print(f"计算新向量: {text[:50]}...")
        
        payload = {
            "model": "embeddinggemma:300m",
            "prompt": text,
            "stream": False
        }
        
        response = requests.post(OLLAMA_URL, json=payload)
        if response.status_code == 200:
            return response.json().get("embedding", [])
        
        return []
    
    def batch_with_cache(self, texts: List[str]):
        """批量处理,利用缓存"""
        results = []
        
        for text in texts:
            embedding = self.get_embedding_cached(text)
            results.append((text, embedding))
        
        # 显示缓存统计
        cache_info = self.get_embedding_cached.cache_info()
        print(f"\n缓存统计:")
        print(f"  命中次数: {cache_info.hits}")
        print(f"  未命中次数: {cache_info.misses}")
        print(f"  缓存大小: {cache_info.currsize}/{cache_info.maxsize}")
        
        return results

# 测试缓存效果
cached_generator = CachedEmbeddingGenerator()

# 测试数据(包含重复文本)
test_texts = [
    "人工智能技术应用",
    "机器学习算法",
    "深度学习模型",
    "人工智能技术应用",  # 重复
    "自然语言处理",
    "机器学习算法",  # 重复
    "计算机视觉",
    "深度学习模型",  # 重复
]

print("第一次处理(会有缓存未命中):")
results1 = cached_generator.batch_with_cache(test_texts)

print("\n第二次处理(应该有很多缓存命中):")
results2 = cached_generator.batch_with_cache(test_texts)

6. 总结

通过本文的实践,你应该已经掌握了使用ollama部署embeddinggemma-300m的核心技能。我们来回顾一下重点:

核心收获:

  1. 轻量部署:embeddinggemma-300m只有3亿参数,能在普通电脑上流畅运行,真正实现了AI能力的本地化
  2. 灵活定制:支持自定义tokenizer,让你能针对特定领域优化文本处理
  3. 维度可控:通过后处理或配置调整,可以灵活控制输出向量的维度
  4. 实用性强:从语义搜索到文本分类,再到推荐系统,覆盖了常见的应用场景

使用建议:

  • 起步阶段:先用默认配置快速验证想法,不要过早优化
  • 性能优化:根据数据量选择合适的批处理大小和并发数
  • 维度选择:从完整维度开始,根据实际效果需求逐步降维
  • 缓存策略:对重复文本使用缓存,能显著提升系统响应速度

下一步探索方向:

  1. 尝试结合其他模型,构建多模态理解系统
  2. 探索在移动设备上的部署方案
  3. 研究如何用embeddinggemma增强现有的搜索和推荐系统
  4. 测试在不同语言和领域文本上的表现

embeddinggemma-300m虽然小巧,但能力不容小觑。它的价值在于让高质量的文本嵌入能力变得触手可及,不再需要依赖云端服务或昂贵硬件。无论你是个人开发者、创业团队,还是企业中的技术探索者,都可以从这个模型开始,构建属于自己的智能文本处理能力。


获取更多AI镜像

想探索更多AI镜像和应用场景?访问 CSDN星图镜像广场,提供丰富的预置镜像,覆盖大模型推理、图像生成、视频生成、模型微调等多个领域,支持一键部署。

转载自CSDN-专业IT技术社区

原文链接:https://blog.csdn.net/weixin_42124497/article/details/156830538

评论

赞0

评论列表

微信小程序
QQ小程序

关于作者

点赞数:0
关注数:0
粉丝:0
文章:0
关注标签:0
加入于:--