#!/usr/bin/env python3
import collections # for OrderedDict
import shlex
from itertools import takewhile
import re
import sys, os
from . import imagelist
from .metapho import MetaphoImage
# commonprefix is buggy, doesn't restrict itself to path components, see
# http://rosettacode.org/wiki/Find_common_directory_path#Python
# A replacement:
[docs]
def commonprefix(paths):
def allnamesequal(name):
return all(n==name[0] for n in name[1:])
bydirectorylevels = zip(*[p.split(os.path.sep) for p in paths])
return os.path.sep.join(x[0] for x in takewhile(allnamesequal,
bydirectorylevels))
DEFAULT_CAT = "Tags"
[docs]
class Tagger(object):
"""Manages tags for images.
"""
# Extensions we explicitly don't handle that might nevertheless
# be in the same directory as images:
try:
# You can set up your own personal list of extensions to skip
SKIP_EXTENSIONS = os.getenv("NOTAGS_SKIP_EXTENSIONS").split()
except:
SKIP_EXTENSIONS = [
".cr2", ".arw", ".xcf",
".mvi", ".avi", ".mov", ".thm", ".mp4", ".mkv",
".pto", ".txt", ".wav", ".mp3",
".xml", ".pp3"
]
try:
IGNORE_DIRNAMES = os.getenv("NOTAGS_IGNORE_DIRNAMES").split()
except:
IGNORE_DIRNAMES = [ "html", "web", "bad", ".*_assets$" ]
def __init__(self):
"""tagger: an object to manage metapho image tags"""
# The actual per-image lists of tags live in the MetaphoImage class.
# Each image has img.tags, which is a list of tag indices.
# The category list is an OrderedDict
# { "First category": [ 3, 5, 11 ] }
# means category 0 has the name "First category" and includes
# tags 3, 5 and 11 from the tag_list.
self.categories = collections.OrderedDict()
# The tag list is a list of all tags we know about (strings).
# A tag may be in several categories.
# The index of a tag in this list is the tag number.
self.tag_list = []
# Files from which we've read tags (named Tags or Keywords)
self.tagfiles = []
# the directory common to them, where we'll try to store tags
self.commondir = None
# Have any tags changed during this run?
# Don't update the Tags file if the user doesn't change anything.
self.changed = False
self.force_write = False
# What category are we currently processing? Default is Tags.
self.current_category = ''
# All the Tags files we read to initialize.
# We don't necessarily use this, but callers might want to know.
self.all_tags_files = []
def __repr__(self):
"""Returns a string summarizing all known images and tags,
suitable for printing on stdout or pasting into a Tags file.
Don't include images that no longer exist on disk.
"""
outstr = ''
commondirlen = len(self.commondir)
for cat in sorted(self.categories):
outstr += '\ncategory ' + cat + '\n\n'
# self.categories[cat] is a list of numeric tag indices,
# so sorting tags would require a lot more code
# and there's no particular reason to.
for tagno in self.categories[cat]:
tagstr = self.tag_list[tagno]
# No empty tag strings
if tagstr.strip() == '':
continue
imgstr = ''
imglist = []
for img in imagelist.image_list():
# Does the image still exist on disk?
# can't rely on img.invalid because that's only set
# if the user has tried to view that image.
if tagno in img.tags and os.path.exists(img.filename):
imglist.append(img)
# Now we have all the images in this category.
# Sort them alphabetically by name.
imglist.sort()
for img in imglist:
filename = img.filename
if filename.startswith(self.commondir):
filename = filename[commondirlen+1:]
if ' ' in filename:
imgstr += ' "' + filename + '"'
else:
imgstr += ' ' + filename
if imgstr:
outstr += "tag %s :" % tagstr + imgstr + '\n'
return outstr
[docs]
def rename_category(self, old, new):
for i in range(len(self.categories)):
k,v = self.categories.popitem(False)
self.categories[new if old == k else k] = v
[docs]
def write_tag_file(self):
"""Save the current set of tags to a Tags file chosen from
the top-level directory used in the images we've seen.
If there was a previous Tags file there, it will be saved
as Tags.bak.
"""
if not self.changed and not self.force_write:
print("No tags changed; not rewriting Tags file")
return
if not self.tag_list:
print("Nothing was tagged; not writing Tags file")
return
outpath = os.path.join(self.commondir, "Tags")
print("Saving to", outpath)
if os.path.exists(outpath):
os.rename(outpath, outpath + ".bak")
outfile = open(outpath, "w")
outfile.write(str(self))
outfile.close()
[docs]
def check_commondir(self, d):
"""Keep track of the dir common to all directories we use:
XXX commondir code is still somewhat experimental.
"""
if self.commondir is None:
self.commondir = d
else:
# self.commondir = os.path.commonprefix([self.commondir, d])
self.commondir = commonprefix([self.commondir, d])
[docs]
def process_tag(self, tagname, filenames):
"""After reading a tag from a tags file, add it to the global
tags list if it isn't there already, and add the given filenames.
Filenames can be relpaths or absolute normpaths.
"""
try:
tagindex = self.tag_list.index(tagname)
except:
tagindex = len(self.tag_list)
self.tag_list.append(tagname)
try:
self.categories[self.current_category].append(tagindex)
# KeyError if the key doesn't exist, AttributeError if
# self.categories[current_category] exists but isn't a list.
except KeyError:
self.categories[self.current_category] = [tagindex]
# Search for images matching the names in filenames.
for fil in filenames:
tagged = False
for img in imagelist.image_list():
# if img.filename.endswith(fil) and tagindex not in img.tags:
if fil.endswith(img.relpath):
if tagindex not in img.tags:
img.tags.append(tagindex)
tagged = True
break
# Did we find an image matching fil?
# If not, add it as a non-displayed image.
# This isn't needed when running on an explicit image list,
# but it's needed for programs like notags that display
# images with particular tags.
if not tagged:
newim = MetaphoImage(fil, displayed=False)
newim.tags.append(tagindex)
imagelist.add_images(newim)
[docs]
def add_tag(self, tag, img, category=None):
"""Add a tag to the given image,
in the given category or else the current one.
img is a metapho.MetaphoImage.
tag may be a string, which can be a new string or an existing one,
or an integer index into the tag list.
Return the index (in the global tags list) of the tag just added.
"""
self.changed = True
if not category:
category = self.current_category
if category not in self.categories:
self.categories[category] = []
if type(tag) is int:
if tag not in img.tags:
img.tags.append(tag)
return tag
# Else it's a string. Is it already in the tag list?
if tag in self.tag_list:
tagno = self.tag_list.index(tag)
if tagno not in self.categories[category]:
self.categories[category].append(tagno)
img.tags.append(tagno)
return tagno
# Make a new tag.
self.tag_list.append(tag)
newindex = len(self.tag_list) - 1
img.tags.append(newindex)
self.categories[category].append(newindex)
return newindex
[docs]
def remove_tag(self, tag, img):
self.changed = True
if type(tag) is int:
if tag in img.tags:
img.tags.remove(tag)
# Else it's a string. Remove it if it's there.
try:
self.tag_list.remove(tag)
except:
pass
# XXX Need to remove it from self.current_category too?
[docs]
def change_tag(self, entryno, newstr):
"""Update a tag's string.
Called on focus_out from one of the text entries (in GUI metapho).
The entryno should be the same as the index in the current category.
Changes it for all categories, not just the current one.
"""
# To change the tag only in the current category,
# would have to give it a new tag number and resolve the
# question of whether other images with this tag number
# want the old or the new string.
# Number of tags in this category:
numtags = len(self.categories[self.current_category])
newstr = newstr.strip()
if not newstr:
return
cur_img = imagelist.current_image()
try:
tag_list_no = self.categories[self.current_category][entryno]
except:
tag_list_no = None
# If it's changing an existing tag, just do it.
if entryno < numtags:
self.tag_list[self.categories[self.current_category][entryno]] \
= newstr
# The string is nonempty and doesn't change an existing tag,
# so add a new tag.
else:
self.add_tag(newstr, cur_img)
self.changed = True
[docs]
def toggle_tag(self, tagno, img):
"""Toggle tag number tagno for the given img."""
self.changed = True
if tagno in img.tags:
img.tags.remove(tagno)
return
# It's not there yet. See if it exists in the global tag list.
# if tagno > len(self.tag_list):
# print("Warning: adding a not yet existent tag", tagno)
img.tags.append(tagno)
[docs]
def tagname_to_tagno(self, tagname):
"""Given a tag name, return its index in the list. -1 if not found.
"""
for i, tag in enumerate(self.tag_list):
if tagname == tag:
return i
return -1
[docs]
def match_tag(self, pattern):
"""Return a list of tags matching the pattern."""
print("*** match_tag isn't implemented yet!", file=sys.stderr)
return None
[docs]
def tagdict_for_img(self, img):
"""Returns { catname: [tagno, tagnno] }
"""
tagdict = {}
for cat in self.categories:
for tag in img.tags:
if tag in self.categories[cat]:
if cat in tagdict:
tagdict[cat].append(tag)
else:
tagdict[cat] = [tag]
return tagdict
[docs]
def find_untagged_files(self, topdir):
"""Return a list of untagged files and a list of directories
in which nothing is tagged, under topdir.
"""
untagged_files = []
untagged_dirs = []
for root, dirs, files in os.walk(topdir):
deletes = []
for d in dirs:
# Build up a list of ignored directories
# since we can't delete from dirs while iterating over it.
if Tagger.ignore_directory(d, root):
deletes.append(d)
for d in deletes:
dirs.remove(d)
some_local_tags = False
local_untagged = []
nfiles = 0
for f in files:
if f.startswith("Tags") or f.startswith("Keywords"):
continue
# Assume all image files will have an extension
if '.' not in f:
continue
# Filter out file extensions we know we don't handle:
base, ext = os.path.splitext(f)
if ext in self.SKIP_EXTENSIONS:
continue
# Now we have a file that should be tagged. Is it?
nfiles += 1
filepath = os.path.abspath(os.path.join(root, f))
if filepath not in imagelist.image_list():
local_untagged.append(filepath)
elif not some_local_tags:
some_local_tags = True
if some_local_tags: # Something was tagged in this root
untagged_files += local_untagged
elif nfiles: # There are files, but nothing was tagged
untagged_dirs.append(os.path.abspath(root))
return untagged_files, untagged_dirs
[docs]
def print_imagelist(self):
"""Similar to imagelist.print_imagelist()
except it also prints tags.
"""
print("imagelist:")
if imagelist.img_list:
for img in imagelist.img_list:
if img == imagelist.current_image():
print(" >> ", end='')
else:
print(" ", end='')
print(img, "tags:",
'; '.join(["%d, %s" % (t, self.tag_list[t])
for t in img.tags ]))
else:
print(" No images in imagelist yet")
print("All tags:", self.tag_list)
if self.current_category:
print("Current category:", self.categories[self.current_category])
else:
print("No category set yet")
[docs]
@classmethod
def ignore_directory(cls, d, path=None):
"""Detect directory names that don't need to be indexed separately
and aren't likely to have a Tags file;
for instance, those that likely contain copies of what's in
the parent, or small copies for a web page.
Also, you can skip tagging by creating a file named NoTags.
"""
for ipat in Tagger.IGNORE_DIRNAMES:
if re.match(ipat, d):
return True
if path and os.path.exists(os.path.join(path, d, "NoTags")):
return True
return False
[docs]
@staticmethod
def print_files_by_directory(filelist):
"""Given a list of pathnames, group them by which directory
they belong to and print them in an organized way.
"""
dirdic = {}
for f in filelist:
# Split into dirname and basename:
dn, bn = os.path.split(f)
if dn in dirdic:
dirdic[dn].append(bn)
else:
dirdic[dn] = [ bn ]
dirlist = list(dirdic.keys())
dirlist.sort()
for d in dirlist:
if d.strip():
print(' %s:' % d)
# Calling split_by_line_length makes for pretty, readable output.
# However, it's split over multiple lines and thus hard to
# paste into a metapho command. Might want to make it an option.
# print(Tagger.split_by_line_length(' '.join(sorted(dirdic[d])), 74, ' '))
print(' ', ' '.join(sorted(dirdic[d])))
[docs]
@staticmethod
def split_by_line_length(s, linelen, prefix=''):
"""Given a long string, split it into lines no longer than linelen,
with each line optionally prefixed, e.g. with indentation.
Currently this splits only at spaces, not tabs.
"""
ret = ''
while True:
if len(s) <= linelen:
return ret + prefix + s
lastspace = linelen
while s[lastspace] != ' ':
lastspace -= 1
# s[lastspace] is the last space before linelen.
# Now go back to the last non-space character.
pos = lastspace
while s[pos] == ' ':
pos -= 1
ret += prefix + s[:pos+1] + '\n'
s = s[lastspace + 1:]
[docs]
def Usage():
progname = os.path.basename(sys.argv[0])
print("Usage:", progname)
print()
print("""Find directories under the current one that have image files
but lack a file named either Tags or Keywords.""")
print()
print(progname, "will ignore files with the following extensions:")
print(' ', ' '.join(Tagger.SKIP_EXTENSIONS))
print(" (you can configure that with an environment variable,")
print(" e.g. export NOTAGS_SKIP_EXTENSIONS='.cr2 .mp3')")
print(progname, "will ignore directories with these names (regex):")
print(' ', ' '.join(Tagger.IGNORE_DIRNAMES))
print(" (configure that with the environment variable "
"NOTAGS_IGNORE_DIRNAMES)")
print(" as well as directories with the same name "
"as the parent directory,\n e.g. yosemite/yosemite")
print("It will also ignore any directory containing a file named NoTags.")
sys.exit(0)
[docs]
def main():
"""The script linked as notags:
Read tags and report any inconsistencies:
images in the Tags file that don't exist on disk,
images on disk that aren't in ./Tags.
"""
if len(sys.argv) > 1 and (sys.argv[1] == '-h' or sys.argv[1] == '--help'):
Usage()
tagger = Tagger()
tagger.read_tags('.')
print()
curdir = os.path.abspath('.')
curdirlen = len(curdir)
def rel_dirs(dirs):
"""Take absolute paths and make them relative to curdir
"""
# Nested list comprehension, ugh.
# Remove leading curdir when it exists,
# but if that results in a null string, substitute '.'.
return [ d if d else '.'
for d in [ p[curdirlen+1:]
if p.startswith(curdir)
else p for p in dirs ] ]
# This might be interesting information but it's too long a list
# when evaluating a year's photos.
# print "Found Tags files in:", ' '.join(tagger.all_tags_files)
# print
nef = MetaphoImage.find_nonexistent_files()
if nef:
print("Tagged files that don't exist on disk:", ' '.join(rel_dirs(nef)))
print()
utf, utd = tagger.find_untagged_files('.')
if utd:
print("Directories that need a Tags file:", ' '.join(rel_dirs(utd)))
print()
if utf:
print("Individual files that aren't tagged:")
tagger.print_files_by_directory(rel_dirs(utf))
if __name__ == '__main__':
main()