import traceback
from
pymongo import MongoClient
from
elasticsearch import Elasticsearch
# 建立到MongoDB的连接
# 建立到Elasticsearch的连接
_es = Elasticsearch()
# 初始化索引的Mappings设置
_index_mappings = {
"mappings"
: {
"user"
: {
"properties"
: {
"title"
: {
"type"
:
"text"
},
"name"
: {
"type"
:
"text"
},
"age"
: {
"type"
:
"integer"
}
}
},
"blogpost"
: {
"properties"
: {
"title"
: {
"type"
:
"text"
},
"body"
: {
"type"
:
"text"
},
"user_id"
: {
"type"
:
"keyword"
},
"created"
: {
"type"
:
"date"
}
}
}
}
}
# 如果索引不存在,则创建索引
if _es.indices.exists(
index
=
'blog_index'
)
is
not
True
:
_es.indices.
create
(
index
=
'blog_index'
, body=_index_mappings)
# 从MongoDB中查询数据,由于在Elasticsearch使用自动生成_id,因此从MongoDB查询
# 返回的结果中将_id去掉。
user_cursor = db.
user
.find({}, projection={
'_id'
:
False
})
user_docs = [x
for
x
in
user_cursor]
# 记录处理的文档数
processed = 0
# 将查询出的文档添加到Elasticsearch中
for
_doc
in
user_docs:
try:
_es.
index
(
index
=
'blog_index'
, doc_type=
'user'
, body=_doc)
processed += 1
print(
'Processed: '
+ str(processed), flush=
True
)
except
:
traceback.print_exc()
# 查询所有记录结果
print(
'Search all...'
, flush=
True
)
_query_all = {
'query'
: {
'match_all'
: {}
}
}
_searched = _es.search(
index
=
'blog_index'
, doc_type=
'user'
, body=_query_all)
print(_searched, flush=
True
)
# 输出查询到的结果
for
hit
in
_searched[
'hits'
][
'hits'
]:
print(hit[
'_source'
], flush=
True
)
# 查询姓名中包含jerry的记录
print(
'Search name contains jerry.'
, flush=
True
)
_query_name_contains = {
'query'
: {
'match'
: {
'name'
:
'jerry'
}
}
}
_searched = _es.search(
index
=
'blog_index'
, doc_type=
'user'
, body=_query_name_contains)
print(_searched, flush=
True
)