Browse Source

WIP: server and repl. Code cleanup.

master
J. David Lee 4 years ago
parent
commit
4bb5f5ea66
12 changed files with 78 additions and 48 deletions
  1. +11
    -10
      README.md
  2. +1
    -0
      errors/errors.go
  3. +4
    -0
      network/reader.go
  4. +4
    -4
      node/exec.go
  5. +16
    -5
      node/server.go
  6. +2
    -2
      parser/parse-delete.go
  7. +3
    -3
      parser/parse-delete_test.go
  8. +1
    -1
      parser/parse-upsert.go
  9. +3
    -3
      parser/parse-upsert_test.go
  10. +11
    -10
      ph/const.go
  11. +11
    -3
      ph/stmts.go
  12. +11
    -7
      photon/repl.go

+ 11
- 10
README.md View File

@ -9,26 +9,27 @@
### Roadmap
* [X] UPSERT INTO query's path should be a string.
* [X] TS() should be a function taking a string and returning a timestamp
* [ ] Basic query parsing on the client to determine if files need to be sent.
* [ ] UPSERT INTO query in repl and server.
* [X] Basic query parsing on the client to determine if files need to be sent
* [X] UPSERT INTO query in repl and server.
* [X] `node`: Create server that can handle a raw query request
* [X] REPL: readline to put full statements into history
* [X] table namespacing within datasets?
* [ ] SHOW DATASETS
* [ ] SHOW TABLES FROM dataset
* [ ] Schema contains dataset and table names
* [ ] `client<-->node`: create table
* [ ] `client<-->node`: drop table
* [ ] `client<-->node`: undrop table
* [ ] `client<-->node`: show schema
* [ ] Additional statement types
* [X] SHOW DATASETS
* [X] SHOW TABLES FROM dataset
* [X] DELETE FROM
* [ ] SELECT RANGE
* [ ] SELECT RESAMPLE
* [ ] CSV files should be sent w/ gzip compression (configurable level).
#### Later
* [ ] UPSERT INTO should return inserted row count
* [ ] UNDROP with existing table/dataset returns unexpected - can do better
* [ ] Add LIMIT clause to resample to limit the number of groups returned
* [ ] `executor` distinct query
* [ ] `datatable` tests for error conditions


+ 1
- 0
errors/errors.go View File

@ -87,6 +87,7 @@ var (
UnorderedTS = newError("UnorderedTS")
InvalidTS = newError("InvalidTS")
Duplicate = newError("Duplicate")
NoFileSent = newError("NoFileSent")
TableEmpty = newError("TableEmpty")


+ 4
- 0
network/reader.go View File

@ -75,6 +75,10 @@ func (r reader) ReadFile(outPath string) error {
return err
}
if size == 0 {
return errors.NoFileSent
}
f, err := os.Create(outPath)
if err != nil {
return errors.Unexpected.WithErr(err)


+ 4
- 4
node/exec.go View File

@ -24,10 +24,10 @@ func (n *Node) ExecStmt(stmt interface{}) (result interface{}, err error) {
case ph.ShowSchemaStmt:
return n.execShowSchema(x)
case ph.UpsertStmt:
case ph.UpsertIntoStmt:
return n.execUpsert(x)
case ph.DeleteStmt:
case ph.DeleteFromStmt:
return n.execDelete(x)
case ph.RangeStmt:
@ -62,7 +62,7 @@ func (n *Node) execShowSchema(stmt ph.ShowSchemaStmt) (interface{}, error) {
return n.store.TableGetSchema(stmt.TableName)
}
func (n *Node) execUpsert(stmt ph.UpsertStmt) (interface{}, error) {
func (n *Node) execUpsert(stmt ph.UpsertIntoStmt) (interface{}, error) {
schema, err := n.store.TableGetSchema(stmt.TableName)
if err != nil {
return nil, err
@ -71,7 +71,7 @@ func (n *Node) execUpsert(stmt ph.UpsertStmt) (interface{}, error) {
return nil, err
}
func (n *Node) execDelete(stmt ph.DeleteStmt) (interface{}, error) {
func (n *Node) execDelete(stmt ph.DeleteFromStmt) (interface{}, error) {
_, err := n.store.SegmentDelete(stmt.TableName, stmt.Day)
return nil, err
}


+ 16
- 5
node/server.go View File

@ -142,15 +142,19 @@ func (n *Node) handleRawQuery(c net.Conn) bool {
respType = ph.RespTypeShowSchema
resp, err = n.showSchema(x)
case ph.UpsertStmt:
case ph.UpsertIntoStmt:
filePath, err = n.getFile(c)
if err != nil {
break
return n.writeErr(c, err)
}
respType = ph.RespTypeUpsert
respType = ph.RespTypeUpsertInto
resp, err = n.upsert(x, filePath)
_ = os.Remove(filePath)
case ph.DeleteFromStmt:
respType = ph.RespTypeDeleteFrom
resp, err = n.delete(x)
default:
err = errors.Unexpected.WithMsg("Query wasn't understood.")
}
@ -243,7 +247,7 @@ func (n *Node) showSchema(stmt ph.ShowSchemaStmt) (interface{}, error) {
// ----------------------------------------------------------------------------
func (n *Node) upsert(
stmt ph.UpsertStmt,
stmt ph.UpsertIntoStmt,
localPath string,
) (
interface{},
@ -259,9 +263,16 @@ func (n *Node) upsert(
schema,
stmt.Day,
localPath)
return ph.UpsertResult{
return ph.UpsertIntoResult{
Dataset: stmt.Dataset,
TableName: stmt.TableName,
Day: stmt.Day,
}, err
}
// ----------------------------------------------------------------------------
func (n *Node) delete(stmt ph.DeleteFromStmt) (interface{}, error) {
_, err := n.store.SegmentDelete(stmt.Dataset, stmt.TableName, stmt.Day)
return ph.DeleteFromResult(stmt), err
}

+ 2
- 2
parser/parse-delete.go View File

@ -15,7 +15,7 @@ func parseDelete(
return nil, err
}
day, err := parseDayStr(s)
day, err := parseDay(s)
if err != nil {
return nil, err
}
@ -24,7 +24,7 @@ func parseDelete(
return nil, err
}
return ph.DeleteStmt{
return ph.DeleteFromStmt{
Dataset: dataset,
TableName: tableName,
Day: day,


+ 3
- 3
parser/parse-delete_test.go View File

@ -13,14 +13,14 @@ func TestParseDelete(t *testing.T) {
type TestCase struct {
in string
out ph.DeleteStmt
out ph.DeleteFromStmt
err error
}
cases := []TestCase{
{
in: `DELETE FROM test.table1 ON "2018-06-10"`,
out: ph.DeleteStmt{
out: ph.DeleteFromStmt{
Dataset: "test",
TableName: "table1",
Day: 20180610,
@ -51,7 +51,7 @@ func TestParseDelete(t *testing.T) {
if err != nil {
continue
}
stmt := i.(ph.DeleteStmt)
stmt := i.(ph.DeleteFromStmt)
if !reflect.DeepEqual(stmt, tc.out) {
t.Fatalf("%s %v != %v", tc.in, i, tc.out)
}


+ 1
- 1
parser/parse-upsert.go View File

@ -33,7 +33,7 @@ func parseUpsert(
return nil, err
}
return ph.UpsertStmt{
return ph.UpsertIntoStmt{
Dataset: dataset,
TableName: tableName,
Day: day,


+ 3
- 3
parser/parse-upsert_test.go View File

@ -13,14 +13,14 @@ func TestParseUpsert(t *testing.T) {
type TestCase struct {
in string
out ph.UpsertStmt
out ph.UpsertIntoStmt
err error
}
cases := []TestCase{
{
in: `UPSERT INTO test.table_name ON 2018-12-21 FROM CSV "some/path"`,
out: ph.UpsertStmt{
out: ph.UpsertIntoStmt{
Dataset: "test",
TableName: "table_name",
Day: 20181221,
@ -61,7 +61,7 @@ func TestParseUpsert(t *testing.T) {
if err != nil {
continue
}
stmt := i.(ph.UpsertStmt)
stmt := i.(ph.UpsertIntoStmt)
if !reflect.DeepEqual(stmt, tc.out) {
t.Fatalf("%v != %v", i, tc.out)
}


+ 11
- 10
ph/const.go View File

@ -8,14 +8,15 @@ const (
RespTypeSendFile = 4000
RespTypeError = 4001
RespTypeCreateDataset = 4002 // CREATE DATASET
RespTypeDropDataset = 4003 // DROP DATASET
RespTypeUndropDataset = 4004 // UNDROP DATASET
RespTypeShowDatasets = 4005 // SHOW DATASETS
RespTypeCreateTable = 4006 // CREATE TABLE
RespTypeShowTables = 4007 // SHOW TABLES
RespTypeDropTable = 4008 // DROP TABLE
RespTypeUndropTable = 4009 // UNDROP TABLE
RespTypeShowSchema = 4010 // SHOW SCHEMA
RespTypeUpsert = 4011 // UPSERT INTO
RespTypeCreateDataset = 4010 // CREATE DATASET
RespTypeDropDataset = 4020 // DROP DATASET
RespTypeUndropDataset = 4030 // UNDROP DATASET
RespTypeShowDatasets = 4040 // SHOW DATASETS
RespTypeCreateTable = 4050 // CREATE TABLE
RespTypeShowTables = 4060 // SHOW TABLES
RespTypeDropTable = 4070 // DROP TABLE
RespTypeUndropTable = 4080 // UNDROP TABLE
RespTypeShowSchema = 4090 // SHOW SCHEMA
RespTypeUpsertInto = 4100 // UPSERT INTO
RespTypeDeleteFrom = 4110 // DELETE FROM
)

+ 11
- 3
ph/stmts.go View File

@ -113,14 +113,14 @@ type ShowSchemaResult struct {
// ----------------------------------------------------------------------------
// UPSERT INTO <dataset>.<tableName> ON "YYYY-MM-DD" FROM CSV "<csvPath>"
type UpsertStmt struct {
type UpsertIntoStmt struct {
Dataset string
TableName string
Day int32
CSVPath string
}
type UpsertResult struct {
type UpsertIntoResult struct {
Dataset string
TableName string
Day int32
@ -131,15 +131,23 @@ type UpsertResult struct {
// TODO: Update columns (from CSV or expressions)
// ----------------------------------------------------------------------------
// TODO: Append to day from CSV
// ----------------------------------------------------------------------------
// DELETE FROM <dataset>.<tableName> ON <day>
type DeleteStmt struct {
type DeleteFromStmt struct {
Dataset string
TableName string
Day int32
}
type DeleteFromResult DeleteFromStmt
// ----------------------------------------------------------------------------
// SELECT RANGE FROM <dataset>.<tableName>
type RangeStmt struct {
Dataset string


+ 11
- 7
photon/repl.go View File

@ -105,8 +105,11 @@ func Main() {
case ph.RespTypeShowSchema:
resp = &ph.ShowSchemaResult{}
case ph.RespTypeUpsert:
resp = &ph.UpsertResult{}
case ph.RespTypeUpsertInto:
resp = &ph.UpsertIntoResult{}
case ph.RespTypeDeleteFrom:
resp = &ph.DeleteFromResult{}
default:
log.Fatalf("Response type wasn't understood.")
@ -176,11 +179,12 @@ func printResult(resp interface{}) {
fmt.Println("")
printCols(x.Cols)
case *ph.UpsertResult:
fmt.Printf("Dataset: %s\n", x.Dataset)
fmt.Printf("TableName: %s\n", x.TableName)
fmt.Printf("Day: %d\n", x.Day)
fmt.Printf("Rows: %d\n", x.Inserted)
case *ph.UpsertIntoResult:
fmt.Printf("Inserted %d rows into %s.%s on %d.\n",
x.Inserted, x.Dataset, x.TableName, x.Day)
case *ph.DeleteFromResult:
fmt.Printf("Deleted from %s.%s on %d.\n", x.Dataset, x.TableName, x.Day)
default:
fmt.Printf("Result wasn't understood.")


Loading…
Cancel
Save