"""
PostgreSQL query processor
--------------------------
"""
import re
__all__ = ['for_tsquery']
_tsquery_tokens_re = re.compile(r'(:\*|\*|&|!|\||AND|OR|NOT|-|\(|\))', re.U)
_whitespace_re = re.compile(r'\s+', re.U)
_token_map = {'AND': '&', 'OR': '|', 'NOT': '!', '-': '!', '*': ':*'}
[docs]def for_tsquery(text):
r"""
Tokenize text into a valid PostgreSQL to_tsquery query.
>>> for_tsquery(" ")
''
>>> for_tsquery("This is a test")
"'This is a test'"
>>> for_tsquery('Match "this AND phrase"')
"'Match this'&'phrase'"
>>> for_tsquery('Match "this & phrase"')
"'Match this'&'phrase'"
>>> for_tsquery("This NOT that")
"'This'&!'that'"
>>> for_tsquery("This & NOT that")
"'This'&!'that'"
>>> for_tsquery("This > that")
"'This > that'"
>>> for_tsquery("Ruby AND (Python OR JavaScript)")
"'Ruby'&('Python'|'JavaScript')"
>>> for_tsquery("Ruby AND NOT (Python OR JavaScript)")
"'Ruby'&!('Python'|'JavaScript')"
>>> for_tsquery("Ruby NOT (Python OR JavaScript)")
"'Ruby'&!('Python'|'JavaScript')"
>>> for_tsquery("Ruby (Python OR JavaScript) Golang")
"'Ruby'&('Python'|'JavaScript')&'Golang'"
>>> for_tsquery("Ruby (Python OR JavaScript) NOT Golang")
"'Ruby'&('Python'|'JavaScript')&!'Golang'"
>>> for_tsquery("Java*")
"'Java':*"
>>> for_tsquery("Java**")
"'Java':*"
>>> for_tsquery("Android || Python")
"'Android'|'Python'"
>>> for_tsquery("Missing (bracket")
"'Missing'&('bracket')"
>>> for_tsquery("Extra bracket)")
"('Extra bracket')"
>>> for_tsquery("Android (Python ())")
"'Android'&('Python')"
>>> for_tsquery("Android (Python !())")
"'Android'&('Python')"
>>> for_tsquery("()")
''
>>> for_tsquery("(")
''
>>> for_tsquery("() Python")
"'Python'"
>>> for_tsquery("!() Python")
"'Python'"
>>> for_tsquery("*")
''
>>> for_tsquery("/etc/passwd\x00")
"'/etc/passwd'"
"""
tokens = [
_token_map.get(t, t)
for t in _tsquery_tokens_re.split(
_whitespace_re.sub(
' ', text.replace("'", " ").replace('"', ' ').replace('\0', '')
)
)
]
tokens = [
t if t in ('&', '|', '!', ':*', '(', ')', ' ') else "'" + t.strip() + "'"
for t in tokens
]
tokens = [t for t in tokens if t not in ('', ' ', "''")]
if not tokens:
return ''
counterlength = len(tokens)
counter = 1
while counter < counterlength:
if tokens[counter] == '!' and tokens[counter - 1] not in ('&', '|', '('):
tokens.insert(counter, '&')
counter += 1
counterlength += 1
elif tokens[counter] == '(' and tokens[counter - 1] not in ('&', '|', '!'):
tokens.insert(counter, '&')
counter += 1
counterlength += 1
elif tokens[counter] == ')' and tokens[counter - 1] == '(':
# Empty ()
tokens.pop(counter)
tokens.pop(counter - 1)
counter -= 2
counterlength -= 2
# Pop the join with previous segment too
if tokens and tokens[counter] in ('&', '|'):
tokens.pop(counter)
counter -= 1
counterlength -= 1
elif tokens and counter == 0 and tokens[counter] == '!':
tokens.pop(counter)
counter -= 1
counterlength -= 1
elif (
tokens
and counter > 0
and tokens[counter - 1 : counter + 1] in (['&', '!'], ['|', '!'])
):
tokens.pop(counter)
tokens.pop(counter - 1)
counter -= 2
counterlength -= 2
elif tokens[counter].startswith("'") and tokens[counter - 1] not in (
'&',
'|',
'!',
'(',
):
tokens.insert(counter, '&')
counter += 1
counterlength += 1
elif (
(tokens[counter] in ('&', '|') and tokens[counter - 1] in ('&', '|'))
or (tokens[counter] == '!' and tokens[counter - 1] not in ('&', '|'))
or (tokens[counter] == ':*' and not tokens[counter - 1].startswith("'"))
):
# Invalid token: is a dupe or follows a token it shouldn't follow
tokens.pop(counter)
counter -= 1
counterlength -= 1
counter += 1
while tokens and tokens[0] in ('&', '|', ':*', ')', '!', '*'):
tokens.pop(0) # Can't start with a binary or suffix operator
if tokens:
while tokens and tokens[-1] in ('&', '|', '!', '('):
tokens.pop(-1) # Can't end with a binary or prefix operator
if not tokens:
return '' # Did we just eliminate all tokens?
missing_brackets = sum(1 if t == '(' else -1 for t in tokens if t in ('(', ')'))
if missing_brackets > 0:
tokens.append(')' * missing_brackets)
elif missing_brackets < 0:
tokens.insert(0, '(' * -missing_brackets)
return ''.join(tokens)